This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e06d882bdd [RRIO] [Throttle] [Cache] Implement Throttle and Cache 
using an external resource. (#29401)
1e06d882bdd is described below

commit 1e06d882bddfa2eae33a6bcfc462fe10172704bb
Author: Damon <[email protected]>
AuthorDate: Wed Nov 22 11:12:58 2023 -0800

    [RRIO] [Throttle] [Cache] Implement Throttle and Cache using an external 
resource. (#29401)
    
    * WIP: Implement CacheSerializer and providers
    
    * wip
    
    * Condense Throttle into one class
    
    * wip
    
    * Implement Throttle and Cache
    
    * Update javadoc
    
    * Edit per PR comments
    
    * Refacter per PR comments
---
 .../configmap.yaml                                 |  30 ++
 .../deployment.yaml                                |  27 ++
 .../kustomization.yaml                             |  34 ++
 sdks/java/io/rrio/build.gradle                     |   1 +
 .../org/apache/beam/io/requestresponse/Cache.java  | 239 ++++++++++++
 .../apache/beam/io/requestresponse/CacheRead.java  | 121 ------
 .../apache/beam/io/requestresponse/CacheWrite.java | 119 ------
 .../org/apache/beam/io/requestresponse/Call.java   |  39 +-
 .../org/apache/beam/io/requestresponse/Quota.java  |  70 ++++
 .../beam/io/requestresponse/RedisClient.java       |  10 +
 .../beam/io/requestresponse/ThrottleDequeue.java   | 101 -----
 .../beam/io/requestresponse/ThrottleEnqueue.java   |  61 ---
 .../io/requestresponse/ThrottleRefreshQuota.java   |  55 ---
 .../ThrottleWithExternalResource.java              | 418 +++++++++++++++++++++
 .../apache/beam/io/requestresponse/CacheIT.java    | 120 ++++++
 .../apache/beam/io/requestresponse/CacheTest.java  | 132 +++++++
 .../apache/beam/io/requestresponse/CallTest.java   | 112 +++++-
 ...java => EchoGRPCCallerWithSetupTeardownIT.java} |  14 +-
 ...HTTPCallerTestIT.java => EchoHTTPCallerIT.java} |  18 +-
 .../beam/io/requestresponse/EchoITOptions.java     |   7 +-
 .../beam/io/requestresponse/EchoRequestCoder.java  |  44 +++
 .../{RedisClientTestIT.java => RedisClientIT.java} |  24 +-
 .../ThrottleWithExternalResourceIT.java            | 186 +++++++++
 .../ThrottleWithExternalResourceTest.java          |  77 ++++
 24 files changed, 1559 insertions(+), 500 deletions(-)

diff --git 
a/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/configmap.yaml
 
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/configmap.yaml
new file mode 100644
index 00000000000..6a482b21b16
--- /dev/null
+++ 
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/configmap.yaml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Configures patch for ../base/configmap.yaml
+# See 
https://kubectl.docs.kubernetes.io/references/kustomize/kustomization/patches/
+
+- op: replace
+  path: /metadata/labels/quota-id
+  value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
+- op: replace
+  path: /data/QUOTA_ID
+  value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
+- op: replace
+  path: /data/QUOTA_SIZE
+  value: "10"
+- op: replace
+  path: /data/QUOTA_REFRESH_INTERVAL
+  value: 1s
diff --git 
a/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/deployment.yaml
 
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/deployment.yaml
new file mode 100644
index 00000000000..cff2f994cd6
--- /dev/null
+++ 
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/deployment.yaml
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Configures patch for ../base/deployment.yaml
+# See 
https://kubectl.docs.kubernetes.io/references/kustomize/kustomization/patches/
+
+- op: replace
+  path: /metadata/labels/quota-id
+  value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
+- op: replace
+  path: /spec/selector/matchLabels/quota-id
+  value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
+- op: replace
+  path: /spec/template/metadata/labels/quota-id
+  value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
diff --git 
a/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/kustomization.yaml
 
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/kustomization.yaml
new file mode 100644
index 00000000000..d10598be51f
--- /dev/null
+++ 
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/kustomization.yaml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Configures the overlay for 
.test-infra/mock-apis/infrastructure/kubernetes/refresher/base
+# Using the Quota Id:
+# echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
+
+resources:
+- ../../base
+
+nameSuffix: -throttle-with-external-resource-test-10-per-1s
+
+patches:
+- path: configmap.yaml
+  target:
+    kind: ConfigMap
+    name: refresher
+
+- path: deployment.yaml
+  target:
+    kind: Deployment
+    name: refresher
diff --git a/sdks/java/io/rrio/build.gradle b/sdks/java/io/rrio/build.gradle
index bfd030ce61d..4ecdf4e91df 100644
--- a/sdks/java/io/rrio/build.gradle
+++ b/sdks/java/io/rrio/build.gradle
@@ -50,6 +50,7 @@ dependencies {
     testImplementation 
platform(library.java.google_cloud_platform_libraries_bom)
     testImplementation library.java.google_http_client
     testImplementation library.java.junit
+    testImplementation library.java.hamcrest
     testImplementation library.java.testcontainers_base
 
     testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java
new file mode 100644
index 00000000000..b8e526c4829
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/** Transforms for reading and writing request/response associations to a 
cache. */
+final class Cache {
+
+  /**
+   * Instantiates a {@link Call} {@link PTransform} that reads {@link 
RequestT} {@link ResponseT}
+   * associations from a cache. The {@link KV} value is null when no 
association exists. This method
+   * does not enforce {@link Coder#verifyDeterministic} and defers to the user 
to determine whether
+   * to enforce this given the cache implementation.
+   */
+  static <
+          RequestT,
+          @Nullable ResponseT,
+          CallerSetupTeardownT extends
+              Caller<RequestT, KV<RequestT, @Nullable ResponseT>> & 
SetupTeardown>
+      PTransform<PCollection<RequestT>, Call.Result<KV<RequestT, @Nullable 
ResponseT>>> read(
+          CallerSetupTeardownT implementsCallerSetupTeardown,
+          Coder<RequestT> requestTCoder,
+          Coder<@Nullable ResponseT> responseTCoder) {
+    return Call.ofCallerAndSetupTeardown(
+        implementsCallerSetupTeardown, KvCoder.of(requestTCoder, 
responseTCoder));
+  }
+
+  /**
+   * Instantiates a {@link Call} {@link PTransform}, calling {@link #read} 
with a {@link Caller}
+   * that employs a redis client.
+   *
+   * <p>This method requires both the {@link RequestT} and {@link ResponseT}s' 
{@link
+   * Coder#verifyDeterministic}. Otherwise, it throws a {@link 
NonDeterministicException}.
+   *
+   * <p><a href="https://redis.io";>Redis</a> is designed for multiple 
workloads, simultaneously
+   * reading and writing to a shared instance. See <a
+   * href="https://redis.io/docs/get-started/faq/";>Redis FAQ</a> for more 
information on important
+   * considerations when using this method to achieve cache reads.
+   */
+  static <RequestT, @Nullable ResponseT>
+      PTransform<PCollection<RequestT>, Call.Result<KV<RequestT, @Nullable 
ResponseT>>>
+          readUsingRedis(
+              RedisClient client,
+              Coder<RequestT> requestTCoder,
+              Coder<@Nullable ResponseT> responseTCoder)
+              throws NonDeterministicException {
+    return read(
+        new UsingRedis<>(requestTCoder, responseTCoder, client).read(),
+        requestTCoder,
+        responseTCoder);
+  }
+
+  /**
+   * Write a {@link RequestT} {@link ResponseT} association to a cache. This 
method does not enforce
+   * {@link Coder#verifyDeterministic} and defers to the user to determine 
whether to enforce this
+   * given the cache implementation.
+   */
+  static <
+          RequestT,
+          ResponseT,
+          CallerSetupTeardownT extends
+              Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>> & 
SetupTeardown>
+      PTransform<PCollection<KV<RequestT, ResponseT>>, 
Call.Result<KV<RequestT, ResponseT>>> write(
+          CallerSetupTeardownT implementsCallerSetupTeardown,
+          KvCoder<RequestT, ResponseT> kvCoder) {
+    return Call.ofCallerAndSetupTeardown(implementsCallerSetupTeardown, 
kvCoder);
+  }
+
+  /**
+   * Instantiates a {@link Call} {@link PTransform}, calling {@link #write} 
with a {@link Caller}
+   * that employs a redis client.
+   *
+   * <p>This method requires both the {@link RequestT} and {@link ResponseT}s' 
{@link
+   * Coder#verifyDeterministic}. Otherwise, it throws a {@link 
NonDeterministicException}.
+   *
+   * <p><a href="https://redis.io";>Redis</a> is designed for multiple 
workloads, simultaneously
+   * reading and writing to a shared instance. See <a
+   * href="https://redis.io/docs/get-started/faq/";>Redis FAQ</a> for more 
information on important
+   * considerations when using this method to achieve cache writes.
+   */
+  static <RequestT, ResponseT>
+      PTransform<PCollection<KV<RequestT, ResponseT>>, 
Call.Result<KV<RequestT, ResponseT>>>
+          writeUsingRedis(
+              Duration expiry,
+              RedisClient client,
+              Coder<RequestT> requestTCoder,
+              Coder<@Nullable ResponseT> responseTCoder)
+              throws NonDeterministicException {
+    return write(
+        new UsingRedis<>(requestTCoder, responseTCoder, client).write(expiry),
+        KvCoder.of(requestTCoder, responseTCoder));
+  }
+
+  private static class UsingRedis<RequestT, ResponseT> {
+    private final Coder<RequestT> requestTCoder;
+    private final Coder<@Nullable ResponseT> responseTCoder;
+    private final RedisClient client;
+
+    private UsingRedis(
+        Coder<RequestT> requestTCoder,
+        Coder<@Nullable ResponseT> responseTCoder,
+        RedisClient client)
+        throws Coder.NonDeterministicException {
+      this.client = client;
+      requestTCoder.verifyDeterministic();
+      responseTCoder.verifyDeterministic();
+      this.requestTCoder = requestTCoder;
+      this.responseTCoder = responseTCoder;
+    }
+
+    private Read<RequestT, @Nullable ResponseT> read() {
+      return new Read<>(requestTCoder, responseTCoder, client);
+    }
+
+    private Write<RequestT, ResponseT> write(Duration expiry) {
+      return new Write<>(expiry, requestTCoder, responseTCoder, client);
+    }
+
+    /** Reads associated {@link RequestT} {@link ResponseT} using a {@link 
RedisClient}. */
+    private static class Read<RequestT, @Nullable ResponseT>
+        implements Caller<RequestT, KV<RequestT, @Nullable ResponseT>>, 
SetupTeardown {
+
+      private final Coder<RequestT> requestTCoder;
+      private final Coder<@Nullable ResponseT> responseTCoder;
+      private final RedisClient client;
+
+      private Read(
+          Coder<RequestT> requestTCoder,
+          Coder<@Nullable ResponseT> responseTCoder,
+          RedisClient client) {
+        this.requestTCoder = requestTCoder;
+        this.responseTCoder = responseTCoder;
+        this.client = client;
+      }
+
+      @Override
+      public KV<RequestT, @Nullable ResponseT> call(RequestT request)
+          throws UserCodeExecutionException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try {
+          requestTCoder.encode(request, baos);
+          byte[] encodedRequest = baos.toByteArray();
+          byte[] encodedResponse = client.getBytes(encodedRequest);
+          if (encodedResponse == null) {
+            return KV.of(request, null);
+          }
+          ResponseT response =
+              checkStateNotNull(
+                  
responseTCoder.decode(ByteSource.wrap(encodedResponse).openStream()));
+          return KV.of(request, response);
+        } catch (IllegalStateException | IOException e) {
+          throw new UserCodeExecutionException(e);
+        }
+      }
+
+      @Override
+      public void setup() throws UserCodeExecutionException {
+        client.setup();
+      }
+
+      @Override
+      public void teardown() throws UserCodeExecutionException {
+        client.teardown();
+      }
+    }
+  }
+
+  private static class Write<RequestT, ResponseT>
+      implements Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>>, 
SetupTeardown {
+    private final Duration expiry;
+    private final Coder<RequestT> requestTCoder;
+    private final Coder<@Nullable ResponseT> responseTCoder;
+    private final RedisClient client;
+
+    private Write(
+        Duration expiry,
+        Coder<RequestT> requestTCoder,
+        Coder<@Nullable ResponseT> responseTCoder,
+        RedisClient client) {
+      this.expiry = expiry;
+      this.requestTCoder = requestTCoder;
+      this.responseTCoder = responseTCoder;
+      this.client = client;
+    }
+
+    @Override
+    public KV<RequestT, ResponseT> call(KV<RequestT, ResponseT> request)
+        throws UserCodeExecutionException {
+      ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
+      ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
+      try {
+        requestTCoder.encode(request.getKey(), keyStream);
+        responseTCoder.encode(request.getValue(), valueStream);
+      } catch (IOException e) {
+        throw new UserCodeExecutionException(e);
+      }
+      client.setex(keyStream.toByteArray(), valueStream.toByteArray(), expiry);
+      return request;
+    }
+
+    @Override
+    public void setup() throws UserCodeExecutionException {
+      client.setup();
+    }
+
+    @Override
+    public void teardown() throws UserCodeExecutionException {
+      client.teardown();
+    }
+  }
+}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheRead.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheRead.java
deleted file mode 100644
index 3765d25370a..00000000000
--- 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheRead.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import java.util.Map;
-import org.apache.beam.io.requestresponse.CacheRead.Result;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-
-/**
- * {@link CacheRead} reads associated {@link ResponseT} types from {@link 
RequestT} types, if any
- * exist.
- */
-class CacheRead<RequestT, ResponseT>
-    extends PTransform<PCollection<RequestT>, Result<RequestT, ResponseT>> {
-
-  private static final TupleTag<ApiIOError> FAILURE_TAG = new 
TupleTag<ApiIOError>() {};
-
-  // TODO(damondouglas): remove suppress warnings after instance utilized.
-  @SuppressWarnings({"unused"})
-  private final Configuration<RequestT, ResponseT> configuration;
-
-  private CacheRead(Configuration<RequestT, ResponseT> configuration) {
-    this.configuration = configuration;
-  }
-
-  /** Configuration details for {@link CacheRead}. */
-  @AutoValue
-  abstract static class Configuration<RequestT, ResponseT> {
-
-    static <RequestT, ResponseT> Builder<RequestT, ResponseT> builder() {
-      return new AutoValue_CacheRead_Configuration.Builder<>();
-    }
-
-    abstract Builder<RequestT, ResponseT> toBuilder();
-
-    @AutoValue.Builder
-    abstract static class Builder<RequestT, ResponseT> {
-
-      abstract Configuration<RequestT, ResponseT> build();
-    }
-  }
-
-  @Override
-  public Result<RequestT, ResponseT> expand(PCollection<RequestT> input) {
-    return Result.of(
-        new TupleTag<KV<RequestT, ResponseT>>() {}, 
PCollectionTuple.empty(input.getPipeline()));
-  }
-
-  /**
-   * The {@link Result} of reading RequestT {@link PCollection} elements 
yielding ResponseT {@link
-   * PCollection} elements.
-   */
-  static class Result<RequestT, ResponseT> implements POutput {
-
-    static <RequestT, ResponseT> Result<RequestT, ResponseT> of(
-        TupleTag<KV<RequestT, ResponseT>> responseTag, PCollectionTuple pct) {
-      return new Result<>(responseTag, pct);
-    }
-
-    private final Pipeline pipeline;
-    private final TupleTag<KV<RequestT, ResponseT>> responseTag;
-    private final PCollection<KV<RequestT, ResponseT>> responses;
-    private final PCollection<ApiIOError> failures;
-
-    private Result(TupleTag<KV<RequestT, ResponseT>> responseTag, 
PCollectionTuple pct) {
-      this.pipeline = pct.getPipeline();
-      this.responseTag = responseTag;
-      this.responses = pct.get(responseTag);
-      this.failures = pct.get(FAILURE_TAG);
-    }
-
-    PCollection<KV<RequestT, ResponseT>> getResponses() {
-      return responses;
-    }
-
-    PCollection<ApiIOError> getFailures() {
-      return failures;
-    }
-
-    @Override
-    public Pipeline getPipeline() {
-      return this.pipeline;
-    }
-
-    @Override
-    public Map<TupleTag<?>, PValue> expand() {
-      return ImmutableMap.of(
-          responseTag, responses,
-          FAILURE_TAG, failures);
-    }
-
-    @Override
-    public void finishSpecifyingOutput(
-        String transformName, PInput input, PTransform<?, ?> transform) {}
-  }
-}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheWrite.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheWrite.java
deleted file mode 100644
index 25249c3e41b..00000000000
--- 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheWrite.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import java.util.Map;
-import org.apache.beam.io.requestresponse.CacheWrite.Result;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-
-/**
- * {@link CacheWrite} writes associated {@link RequestT} and {@link ResponseT} 
pairs to a cache.
- * Using {@link RequestT} and {@link ResponseT}'s {@link 
org.apache.beam.sdk.coders.Coder}, this
- * transform writes encoded representations of this association.
- */
-class CacheWrite<RequestT, ResponseT>
-    extends PTransform<PCollection<KV<RequestT, ResponseT>>, Result<RequestT, 
ResponseT>> {
-
-  private static final TupleTag<ApiIOError> FAILURE_TAG = new 
TupleTag<ApiIOError>() {};
-
-  // TODO(damondouglas): remove suppress warnings after configuration is used.
-  @SuppressWarnings({"unused"})
-  private final Configuration<RequestT, ResponseT> configuration;
-
-  private CacheWrite(Configuration<RequestT, ResponseT> configuration) {
-    this.configuration = configuration;
-  }
-
-  /** Configuration details for {@link CacheWrite}. */
-  @AutoValue
-  abstract static class Configuration<RequestT, ResponseT> {
-
-    static <RequestT, ResponseT> Builder<RequestT, ResponseT> builder() {
-      return new AutoValue_CacheWrite_Configuration.Builder<>();
-    }
-
-    abstract Builder<RequestT, ResponseT> toBuilder();
-
-    @AutoValue.Builder
-    abstract static class Builder<RequestT, ResponseT> {
-
-      abstract Configuration<RequestT, ResponseT> build();
-    }
-  }
-
-  @Override
-  public Result<RequestT, ResponseT> expand(PCollection<KV<RequestT, 
ResponseT>> input) {
-    return Result.of(
-        new TupleTag<KV<RequestT, ResponseT>>() {}, 
PCollectionTuple.empty(input.getPipeline()));
-  }
-
-  /** The {@link Result} of writing a request/response {@link KV} {@link 
PCollection}. */
-  static class Result<RequestT, ResponseT> implements POutput {
-
-    static <RequestT, ResponseT> Result<RequestT, ResponseT> of(
-        TupleTag<KV<RequestT, ResponseT>> responseTag, PCollectionTuple pct) {
-      return new Result<>(responseTag, pct);
-    }
-
-    private final Pipeline pipeline;
-    private final TupleTag<KV<RequestT, ResponseT>> responseTag;
-    private final PCollection<KV<RequestT, ResponseT>> responses;
-    private final PCollection<ApiIOError> failures;
-
-    private Result(TupleTag<KV<RequestT, ResponseT>> responseTag, 
PCollectionTuple pct) {
-      this.pipeline = pct.getPipeline();
-      this.responseTag = responseTag;
-      this.responses = pct.get(responseTag);
-      this.failures = pct.get(FAILURE_TAG);
-    }
-
-    public PCollection<KV<RequestT, ResponseT>> getResponses() {
-      return responses;
-    }
-
-    public PCollection<ApiIOError> getFailures() {
-      return failures;
-    }
-
-    @Override
-    public Pipeline getPipeline() {
-      return this.pipeline;
-    }
-
-    @Override
-    public Map<TupleTag<?>, PValue> expand() {
-      return ImmutableMap.of(
-          responseTag, responses,
-          FAILURE_TAG, failures);
-    }
-
-    @Override
-    public void finishSpecifyingOutput(
-        String transformName, PInput input, PTransform<?, ?> transform) {}
-  }
-}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
index 52181af534e..d52ca971ca4 100644
--- 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
@@ -100,7 +100,11 @@ class Call<RequestT, ResponseT>
             .build());
   }
 
-  private static final TupleTag<ApiIOError> FAILURE_TAG = new 
TupleTag<ApiIOError>() {};
+  // TupleTags need to be instantiated for each Call instance. We cannot use a 
shared
+  // static instance that is shared for multiple PCollectionTuples when Call is
+  // instantiated multiple times as it is reused throughout code in this 
library.
+  private final TupleTag<ResponseT> responseTag = new TupleTag<ResponseT>() {};
+  private final TupleTag<ApiIOError> failureTag = new TupleTag<ApiIOError>() 
{};
 
   private final Configuration<RequestT, ResponseT> configuration;
 
@@ -128,27 +132,30 @@ class Call<RequestT, ResponseT>
 
   @Override
   public @NonNull Result<ResponseT> expand(PCollection<RequestT> input) {
-    TupleTag<ResponseT> responseTag = new TupleTag<ResponseT>() {};
 
     PCollectionTuple pct =
         input.apply(
             CallFn.class.getSimpleName(),
-            ParDo.of(new CallFn<>(responseTag, configuration))
-                .withOutputTags(responseTag, TupleTagList.of(FAILURE_TAG)));
+            ParDo.of(new CallFn<>(responseTag, failureTag, configuration))
+                .withOutputTags(responseTag, TupleTagList.of(failureTag)));
 
-    return Result.of(configuration.getResponseCoder(), responseTag, pct);
+    return Result.of(configuration.getResponseCoder(), responseTag, 
failureTag, pct);
   }
 
   private static class CallFn<RequestT, ResponseT> extends DoFn<RequestT, 
ResponseT> {
     private final TupleTag<ResponseT> responseTag;
+    private final TupleTag<ApiIOError> failureTag;
     private final CallerWithTimeout<RequestT, ResponseT> caller;
     private final SetupTeardownWithTimeout setupTeardown;
 
     private transient @MonotonicNonNull ExecutorService executor;
 
     private CallFn(
-        TupleTag<ResponseT> responseTag, Configuration<RequestT, ResponseT> 
configuration) {
+        TupleTag<ResponseT> responseTag,
+        TupleTag<ApiIOError> failureTag,
+        Configuration<RequestT, ResponseT> configuration) {
       this.responseTag = responseTag;
+      this.failureTag = failureTag;
       this.caller = new CallerWithTimeout<>(configuration.getTimeout(), 
configuration.getCaller());
       this.setupTeardown =
           new SetupTeardownWithTimeout(
@@ -194,7 +201,7 @@ class Call<RequestT, ResponseT>
         ResponseT response = this.caller.call(request);
         receiver.get(responseTag).output(response);
       } catch (UserCodeExecutionException e) {
-        receiver.get(FAILURE_TAG).output(ApiIOError.of(e, request));
+        receiver.get(failureTag).output(ApiIOError.of(e, request));
       }
     }
   }
@@ -269,21 +276,29 @@ class Call<RequestT, ResponseT>
   static class Result<ResponseT> implements POutput {
 
     static <ResponseT> Result<ResponseT> of(
-        Coder<ResponseT> responseTCoder, TupleTag<ResponseT> responseTag, 
PCollectionTuple pct) {
-      return new Result<>(responseTCoder, responseTag, pct);
+        Coder<ResponseT> responseTCoder,
+        TupleTag<ResponseT> responseTag,
+        TupleTag<ApiIOError> failureTag,
+        PCollectionTuple pct) {
+      return new Result<>(responseTCoder, responseTag, pct, failureTag);
     }
 
     private final Pipeline pipeline;
     private final TupleTag<ResponseT> responseTag;
+    private final TupleTag<ApiIOError> failureTag;
     private final PCollection<ResponseT> responses;
     private final PCollection<ApiIOError> failures;
 
     private Result(
-        Coder<ResponseT> responseTCoder, TupleTag<ResponseT> responseTag, 
PCollectionTuple pct) {
+        Coder<ResponseT> responseTCoder,
+        TupleTag<ResponseT> responseTag,
+        PCollectionTuple pct,
+        TupleTag<ApiIOError> failureTag) {
       this.pipeline = pct.getPipeline();
       this.responseTag = responseTag;
+      this.failureTag = failureTag;
       this.responses = pct.get(responseTag).setCoder(responseTCoder);
-      this.failures = pct.get(FAILURE_TAG);
+      this.failures = pct.get(this.failureTag);
     }
 
     public PCollection<ResponseT> getResponses() {
@@ -303,7 +318,7 @@ class Call<RequestT, ResponseT>
     public @NonNull Map<TupleTag<?>, PValue> expand() {
       return ImmutableMap.of(
           responseTag, responses,
-          FAILURE_TAG, failures);
+          failureTag, failures);
     }
 
     @Override
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java
new file mode 100644
index 00000000000..d2e538cf7cf
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import java.io.Serializable;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * A data class that expresses a quota. Web API providers typically define a 
quota as the number of
+ * requests per time interval.
+ */
+public class Quota implements Serializable {
+  private final long numRequests;
+  private final @NonNull Duration interval;
+
+  public Quota(long numRequests, @NonNull Duration interval) {
+    this.numRequests = numRequests;
+    this.interval = interval;
+  }
+
+  /** The number of allowed requests. */
+  public long getNumRequests() {
+    return numRequests;
+  }
+
+  /** The duration context within which to allow requests. */
+  public @NonNull Duration getInterval() {
+    return interval;
+  }
+
+  @Override
+  public boolean equals(@Nullable Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    Quota quota = (Quota) o;
+    return numRequests == quota.numRequests && Objects.equal(interval, 
quota.interval);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(numRequests, interval);
+  }
+
+  @Override
+  public String toString() {
+    return "Quota{" + "numRequests=" + numRequests + ", interval=" + interval 
+ '}';
+  }
+}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java
index a87f5c191e4..a347a185241 100644
--- 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import redis.clients.jedis.JedisPooled;
 import redis.clients.jedis.exceptions.JedisException;
@@ -61,6 +62,15 @@ class RedisClient implements SetupTeardown {
     }
   }
 
+  /** Get a byte array associated with a byte array key. Returns null if key 
does not exist. */
+  byte @Nullable [] getBytes(byte[] key) throws UserCodeExecutionException {
+    try {
+      return getSafeClient().get(key);
+    } catch (JedisException e) {
+      throw new UserCodeExecutionException(e);
+    }
+  }
+
   /**
    * Get the long value stored by the key. Yields zero when key does not 
exist, keeping consistency
    * with Redis convention. Consider using {@link #exists} to query key 
existance.
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleDequeue.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleDequeue.java
deleted file mode 100644
index 085b13b5e11..00000000000
--- 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleDequeue.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import java.util.Map;
-import org.apache.beam.io.requestresponse.ThrottleDequeue.Result;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.joda.time.Instant;
-
-/**
- * {@link ThrottleDequeue} dequeues {@link RequestT} elements at a fixed rate 
yielding a {@link
- * Result} containing the dequeued {@link RequestT} {@link PCollection} and a 
{@link ApiIOError}
- * {@link PCollection} of any errors.
- */
-class ThrottleDequeue<RequestT> extends PTransform<PCollection<Instant>, 
Result<RequestT>> {
-
-  private static final TupleTag<ApiIOError> FAILURE_TAG = new 
TupleTag<ApiIOError>() {};
-
-  // TODO(damondouglas): remove suppress warnings after instance utilized.
-  @SuppressWarnings({"unused"})
-  private final Configuration<RequestT> configuration;
-
-  private ThrottleDequeue(Configuration<RequestT> configuration) {
-    this.configuration = configuration;
-  }
-
-  @Override
-  public Result<RequestT> expand(PCollection<Instant> input) {
-    // TODO(damondouglas): expand in a future PR.
-    return new Result<>(new TupleTag<RequestT>() {}, 
PCollectionTuple.empty(input.getPipeline()));
-  }
-
-  @AutoValue
-  abstract static class Configuration<RequestT> {
-
-    @AutoValue.Builder
-    abstract static class Builder<RequestT> {
-      abstract Configuration<RequestT> build();
-    }
-  }
-
-  /** The {@link Result} of dequeuing {@link RequestT}s. */
-  static class Result<RequestT> implements POutput {
-
-    static <RequestT> Result<RequestT> of(TupleTag<RequestT> requestsTag, 
PCollectionTuple pct) {
-      return new Result<>(requestsTag, pct);
-    }
-
-    private final Pipeline pipeline;
-    private final TupleTag<RequestT> requestsTag;
-    private final PCollection<RequestT> requests;
-    private final PCollection<ApiIOError> failures;
-
-    private Result(TupleTag<RequestT> requestsTag, PCollectionTuple pct) {
-      this.pipeline = pct.getPipeline();
-      this.requestsTag = requestsTag;
-      this.requests = pct.get(requestsTag);
-      this.failures = pct.get(FAILURE_TAG);
-    }
-
-    @Override
-    public Pipeline getPipeline() {
-      return pipeline;
-    }
-
-    @Override
-    public Map<TupleTag<?>, PValue> expand() {
-      return ImmutableMap.of(
-          requestsTag, requests,
-          FAILURE_TAG, failures);
-    }
-
-    @Override
-    public void finishSpecifyingOutput(
-        String transformName, PInput input, PTransform<?, ?> transform) {}
-  }
-}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleEnqueue.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleEnqueue.java
deleted file mode 100644
index 505ef86be48..00000000000
--- 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleEnqueue.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * {@link ThrottleEnqueue} enqueues {@link RequestT} elements yielding an 
{@link ApiIOError} {@link
- * PCollection} of any enqueue errors.
- */
-class ThrottleEnqueue<RequestT> extends PTransform<PCollection<RequestT>, 
PCollection<ApiIOError>> {
-
-  @SuppressWarnings({"unused"})
-  private final Configuration<RequestT> configuration;
-
-  private ThrottleEnqueue(Configuration<RequestT> configuration) {
-    this.configuration = configuration;
-  }
-
-  /** Configuration details for {@link ThrottleEnqueue}. */
-  @AutoValue
-  abstract static class Configuration<RequestT> {
-
-    static <RequestT> Builder<RequestT> builder() {
-      return new AutoValue_ThrottleEnqueue_Configuration.Builder<>();
-    }
-
-    abstract Builder<RequestT> toBuilder();
-
-    @AutoValue.Builder
-    abstract static class Builder<RequestT> {
-
-      abstract Configuration<RequestT> build();
-    }
-  }
-
-  @Override
-  public PCollection<ApiIOError> expand(PCollection<RequestT> input) {
-    // TODO(damondouglas): expand in a future PR.
-    return 
input.getPipeline().apply(Create.empty(TypeDescriptor.of(ApiIOError.class)));
-  }
-}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleRefreshQuota.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleRefreshQuota.java
deleted file mode 100644
index 57e57528db4..00000000000
--- 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleRefreshQuota.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.joda.time.Instant;
-
-/**
- * {@link ThrottleRefreshQuota} refreshes a quota per {@link Instant} 
processing events emitting any
- * errors into an {@link ApiIOError} {@link PCollection}.
- */
-class ThrottleRefreshQuota extends PTransform<PCollection<Instant>, 
PCollection<ApiIOError>> {
-
-  // TODO: remove suppress warnings after configuration utilized.
-  @SuppressWarnings({"unused"})
-  private final Configuration configuration;
-
-  private ThrottleRefreshQuota(Configuration configuration) {
-    this.configuration = configuration;
-  }
-
-  @Override
-  public PCollection<ApiIOError> expand(PCollection<Instant> input) {
-    // TODO(damondouglas): expand in a later PR.
-    return 
input.getPipeline().apply(Create.empty(TypeDescriptor.of(ApiIOError.class)));
-  }
-
-  @AutoValue
-  abstract static class Configuration {
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Configuration build();
-    }
-  }
-}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java
new file mode 100644
index 00000000000..dffc034770a
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Throttles a {@link T} {@link PCollection} using an external resource.
+ *
+ * <p>{@link ThrottleWithExternalResource} makes use of {@link 
PeriodicImpulse} as it needs to
+ * coordinate three {@link PTransform}s concurrently. Usage of {@link 
ThrottleWithExternalResource}
+ * should consider the impact of {@link PeriodicImpulse} on the pipeline.
+ *
+ * <p>Usage of {@link ThrottleWithExternalResource} is completely optional and 
serves as one of many
+ * methods by {@link RequestResponseIO} to protect against API overuse. Usage 
should not depend on
+ * {@link ThrottleWithExternalResource} alone to achieve API overuse 
prevention for several reasons.
+ * The underlying external resource may not scale at all or as fast as a Beam 
Runner. The external
+ * resource itself may be an API with its own quota that {@link 
ThrottleWithExternalResource} does
+ * not consider.
+ *
+ * <p>{@link ThrottleWithExternalResource} makes use of several {@link 
Caller}s that work together
+ * to achieve its aim of throttling a {@link T} {@link PCollection}. A {@link 
RefresherT} is a
+ * {@link Caller} that takes an {@link Instant} and refreshes a shared {@link 
Quota}. An {@link
+ * EnqueuerT} enqueues a {@link T} element while a {@link DequeuerT} dequeues 
said element when the
+ * {@link ReporterT} reports that the stored {@link Quota#getNumRequests} is 
>0. Finally, a {@link
+ * DecrementerT} decrements from the shared {@link Quota} value, additionally 
reporting the value
+ * after performing the action.
+ *
+ * <p>{@link ThrottleWithExternalResource} instantiates and applies two {@link 
Call} {@link
+ * PTransform}s using the aforementioned {@link Caller}s {@link RefresherT} 
and {@link EnqueuerT}.
+ * {@link ThrottleWithExternalResource} calls {@link ReporterT}, {@link 
DequeuerT}, {@link
+ * DecrementerT} within its {@link DoFn}, emitting the dequeued {@link T} when 
the {@link ReporterT}
+ * reports a value >0. As an additional safety check, the DoFn checks whether 
the {@link Quota}
+ * value after {@link DecrementerT}'s action is <0, signaling that multiple 
workers are attempting
+ * the same too fast and therefore exists the DoFn allowing for the next 
refresh.
+ *
+ * <p>{@link ThrottleWithExternalResource} flattens errors emitted from {@link 
EnqueuerT}, {@link
+ * RefresherT}, and its own {@link DoFn} into a single {@link ApiIOError} 
{@link PCollection} that
+ * is encapsulated, with a {@link T} {@link PCollection} output into a {@link 
Call.Result}.
+ */
+class ThrottleWithExternalResource<
+        T,
+        ReporterT extends Caller<String, Long> & SetupTeardown,
+        EnqueuerT extends Caller<T, Void> & SetupTeardown,
+        DequeuerT extends Caller<Instant, T> & SetupTeardown,
+        DecrementerT extends Caller<Instant, Long> & SetupTeardown,
+        RefresherT extends Caller<Instant, Void> & SetupTeardown>
+    extends PTransform<PCollection<T>, Call.Result<T>> {
+
+  /**
+   * Instantiate a {@link ThrottleWithExternalResource} using a {@link 
RedisClient}.
+   *
+   * <p><a href="https://redis.io";>Redis</a> is designed for multiple 
workloads, simultaneously
+   * reading and writing to a shared instance. See <a
+   * href="https://redis.io/docs/get-started/faq/";>Redis FAQ</a> for more 
information on important
+   * considerations when using Redis as {@link ThrottleWithExternalResource}'s 
external resource.
+   */
+  static <T>
+      ThrottleWithExternalResource<
+              T,
+              RedisReporter,
+              RedisEnqueuer<T>,
+              RedisDequeuer<T>,
+              RedisDecrementer,
+              RedisRefresher>
+          usingRedis(URI uri, String quotaIdentifier, String queueKey, Quota 
quota, Coder<T> coder)
+              throws Coder.NonDeterministicException {
+    return new ThrottleWithExternalResource<
+        T, RedisReporter, RedisEnqueuer<T>, RedisDequeuer<T>, 
RedisDecrementer, RedisRefresher>(
+        quota,
+        quotaIdentifier,
+        coder,
+        new RedisReporter(uri),
+        new RedisEnqueuer<>(uri, queueKey, coder),
+        new RedisDequeuer<>(uri, coder, queueKey),
+        new RedisDecrementer(uri, queueKey),
+        new RedisRefresher(uri, quota, quotaIdentifier));
+  }
+
+  private static final Duration THROTTLE_INTERVAL = 
Duration.standardSeconds(1L);
+
+  private final Quota quota;
+  private final String quotaIdentifier;
+  private final Coder<T> coder;
+  private final ReporterT reporterT;
+  private final EnqueuerT enqueuerT;
+  private final DequeuerT dequeuerT;
+  private final DecrementerT decrementerT;
+  private final RefresherT refresherT;
+
+  ThrottleWithExternalResource(
+      Quota quota,
+      String quotaIdentifier,
+      Coder<T> coder,
+      ReporterT reporterT,
+      EnqueuerT enqueuerT,
+      DequeuerT dequeuerT,
+      DecrementerT decrementerT,
+      RefresherT refresherT)
+      throws Coder.NonDeterministicException {
+    this.quotaIdentifier = quotaIdentifier;
+    this.reporterT = reporterT;
+    coder.verifyDeterministic();
+    checkArgument(!quotaIdentifier.isEmpty());
+    this.quota = quota;
+    this.coder = coder;
+    this.enqueuerT = enqueuerT;
+    this.dequeuerT = dequeuerT;
+    this.decrementerT = decrementerT;
+    this.refresherT = refresherT;
+  }
+
+  @Override
+  public Call.Result<T> expand(PCollection<T> input) {
+    Pipeline pipeline = input.getPipeline();
+
+    // Refresh known quota to control the throttle rate.
+    Call.Result<Void> refreshResult =
+        pipeline
+            .apply("quota impulse", 
PeriodicImpulse.create().withInterval(quota.getInterval()))
+            .apply("quota refresh", getRefresher());
+
+    // Enqueue T elements.
+    Call.Result<Void> enqueuResult = input.apply("enqueue", getEnqueuer());
+
+    TupleTag<T> outputTag = new TupleTag<T>() {};
+    TupleTag<ApiIOError> failureTag = new TupleTag<ApiIOError>() {};
+
+    // Perform Throttle.
+    PCollectionTuple pct =
+        pipeline
+            .apply("throttle impulse", 
PeriodicImpulse.create().withInterval(THROTTLE_INTERVAL))
+            .apply(
+                "throttle fn",
+                ParDo.of(
+                        new ThrottleFn(
+                            quotaIdentifier,
+                            dequeuerT,
+                            decrementerT,
+                            reporterT,
+                            outputTag,
+                            failureTag))
+                    .withOutputTags(outputTag, TupleTagList.of(failureTag)));
+
+    PCollection<ApiIOError> errors =
+        PCollectionList.of(refreshResult.getFailures())
+            .and(enqueuResult.getFailures())
+            .and(pct.get(failureTag))
+            .apply("errors flatten", Flatten.pCollections());
+
+    TupleTag<T> resultOutputTag = new TupleTag<T>() {};
+    TupleTag<ApiIOError> resultFailureTag = new TupleTag<ApiIOError>() {};
+
+    return Call.Result.<T>of(
+        coder,
+        resultOutputTag,
+        resultFailureTag,
+        PCollectionTuple.of(resultOutputTag, 
pct.get(outputTag)).and(resultFailureTag, errors));
+  }
+
+  private Call<Instant, Void> getRefresher() {
+    return Call.ofCallerAndSetupTeardown(refresherT, VoidCoder.of());
+  }
+
+  private Call<T, Void> getEnqueuer() {
+    return Call.ofCallerAndSetupTeardown(enqueuerT, VoidCoder.of());
+  }
+
+  private class ThrottleFn extends DoFn<Instant, T> {
+    private final String quotaIdentifier;
+    private final DequeuerT dequeuerT;
+    private final DecrementerT decrementerT;
+    private final ReporterT reporterT;
+    private final TupleTag<T> outputTag;
+    private final TupleTag<ApiIOError> failureTag;
+
+    private ThrottleFn(
+        String quotaIdentifier,
+        DequeuerT dequeuerT,
+        DecrementerT decrementerT,
+        ReporterT reporterT,
+        TupleTag<T> outputTag,
+        TupleTag<ApiIOError> failureTag) {
+      this.quotaIdentifier = quotaIdentifier;
+      this.dequeuerT = dequeuerT;
+      this.decrementerT = decrementerT;
+      this.reporterT = reporterT;
+      this.outputTag = outputTag;
+      this.failureTag = failureTag;
+    }
+
+    @ProcessElement
+    public void process(@Element Instant instant, MultiOutputReceiver 
receiver) {
+      // Check for available quota.
+      try {
+        if (reporterT.call(quotaIdentifier) <= 0L) {
+          return;
+        }
+
+        // Decrement the quota.
+        Long quotaAfterDecrement = decrementerT.call(instant);
+
+        // As an additional protection we check what the quota is after 
decrementing. A value
+        // < 0 signals that multiple simultaneous workers have attempted to 
decrement too quickly.
+        // We don't bother adding the quota back to prevent additional workers 
from doing the same
+        // and just wait for the next refresh, exiting the DoFn.
+        if (quotaAfterDecrement < 0) {
+          return;
+        }
+
+        // Dequeue an element if quota available. An error here would not 
result in loss of data
+        // as no element would successfully dequeue from the external resource 
but instead throw.
+        T element = dequeuerT.call(instant);
+
+        // Finally, emit the element.
+        receiver.get(outputTag).output(element);
+
+      } catch (UserCodeExecutionException e) {
+        receiver
+            .get(failureTag)
+            .output(
+                ApiIOError.builder()
+                    // no request to emit as part of the error.
+                    .setRequestAsJsonString("")
+                    .setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
+                    .setObservedTimestamp(Instant.now())
+                    .setStackTrace(Throwables.getStackTraceAsString(e))
+                    .build());
+      }
+    }
+
+    @Setup
+    public void setup() throws UserCodeExecutionException {
+      enqueuerT.setup();
+      dequeuerT.setup();
+      decrementerT.setup();
+      reporterT.setup();
+    }
+
+    @Teardown
+    public void teardown() throws UserCodeExecutionException {
+      List<String> messages = new ArrayList<>();
+      String format = "%s encountered error during teardown: %s";
+      try {
+        enqueuerT.teardown();
+      } catch (UserCodeExecutionException e) {
+        messages.add(String.format(format, "enqueuerT", e));
+      }
+      try {
+        dequeuerT.teardown();
+      } catch (UserCodeExecutionException e) {
+        messages.add(String.format(format, "dequeuerT", e));
+      }
+      try {
+        decrementerT.teardown();
+      } catch (UserCodeExecutionException e) {
+        messages.add(String.format(format, "decrementerT", e));
+      }
+      try {
+        reporterT.teardown();
+      } catch (UserCodeExecutionException e) {
+        messages.add(String.format(format, "reporterT", e));
+      }
+
+      if (!messages.isEmpty()) {
+        throw new UserCodeExecutionException(String.join("; ", messages));
+      }
+    }
+  }
+
+  private static class RedisReporter extends RedisSetupTeardown implements 
Caller<String, Long> {
+    private RedisReporter(URI uri) {
+      super(new RedisClient(uri));
+    }
+
+    @Override
+    public Long call(String request) throws UserCodeExecutionException {
+      return client.getLong(request);
+    }
+  }
+
+  private static class RedisEnqueuer<T> extends RedisSetupTeardown implements 
Caller<T, Void> {
+    private final String key;
+    private final Coder<T> coder;
+
+    private RedisEnqueuer(URI uri, String key, Coder<T> coder) {
+      super(new RedisClient(uri));
+      this.key = key;
+      this.coder = coder;
+    }
+
+    @Override
+    public Void call(T request) throws UserCodeExecutionException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      try {
+        coder.encode(request, baos);
+      } catch (IOException e) {
+        throw new UserCodeExecutionException(e);
+      }
+      client.rpush(key, baos.toByteArray());
+      return null;
+    }
+  }
+
+  private static class RedisDequeuer<T> extends RedisSetupTeardown implements 
Caller<Instant, T> {
+
+    private final Coder<T> coder;
+    private final String key;
+
+    private RedisDequeuer(URI uri, Coder<T> coder, String key) {
+      super(new RedisClient(uri));
+      this.coder = coder;
+      this.key = key;
+    }
+
+    @Override
+    public T call(Instant request) throws UserCodeExecutionException {
+      byte[] bytes = client.lpop(key);
+      try {
+        return 
checkStateNotNull(coder.decode(ByteSource.wrap(bytes).openStream()));
+
+      } catch (IOException e) {
+        throw new UserCodeExecutionException(e);
+      }
+    }
+  }
+
+  private static class RedisDecrementer extends RedisSetupTeardown
+      implements Caller<Instant, Long> {
+
+    private final String key;
+
+    private RedisDecrementer(URI uri, String key) {
+      super(new RedisClient(uri));
+      this.key = key;
+    }
+
+    @Override
+    public Long call(Instant request) throws UserCodeExecutionException {
+      return client.decr(key);
+    }
+  }
+
+  private static class RedisRefresher extends RedisSetupTeardown implements 
Caller<Instant, Void> {
+    private final Quota quota;
+    private final String key;
+
+    private RedisRefresher(URI uri, Quota quota, String key) {
+      super(new RedisClient(uri));
+      this.quota = quota;
+      this.key = key;
+    }
+
+    @Override
+    public Void call(Instant request) throws UserCodeExecutionException {
+      client.setex(key, quota.getNumRequests(), quota.getInterval());
+      return null;
+    }
+  }
+
+  private abstract static class RedisSetupTeardown implements SetupTeardown {
+    protected final RedisClient client;
+
+    private RedisSetupTeardown(RedisClient client) {
+      this.client = client;
+    }
+
+    @Override
+    public void setup() throws UserCodeExecutionException {
+      client.setup();
+    }
+
+    @Override
+    public void teardown() throws UserCodeExecutionException {
+      client.teardown();
+    }
+  }
+}
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java
new file mode 100644
index 00000000000..95497e6013a
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import java.net.URI;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.io.requestresponse.CallTest.Request;
+import org.apache.beam.io.requestresponse.CallTest.Response;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/** Integration tests for {@link Cache}. */
+@RunWith(JUnit4.class)
+public class CacheIT {
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine";
+  private static final Integer PORT = 6379;
+
+  @Rule
+  public GenericContainer<?> redis =
+      new 
GenericContainer<>(DockerImageName.parse(CONTAINER_IMAGE_NAME)).withExposedPorts(PORT);
+
+  @Rule
+  public RedisExternalResourcesRule externalClients =
+      new RedisExternalResourcesRule(
+          () -> {
+            redis.start();
+            return URI.create(
+                String.format("redis://%s:%d", redis.getHost(), 
redis.getFirstMappedPort()));
+          });
+
+  @Test
+  public void givenRequestResponsesCached_writeThenReadYieldsMatches()
+      throws NonDeterministicException {
+    List<KV<Request, Response>> toWrite =
+        ImmutableList.of(
+            KV.of(new Request("a"), new Response("a")),
+            KV.of(new Request("b"), new Response("b")),
+            KV.of(new Request("c"), new Response("c")));
+    List<Request> toRead = ImmutableList.of(new Request("a"), new 
Request("b"), new Request("c"));
+    writeThenReadThenPAssert(toWrite, toRead, toWrite);
+  }
+
+  @Test
+  public void givenNoMatchingRequestResponsePairs_yieldsKVsWithNullValues()
+      throws NonDeterministicException {
+    List<KV<Request, Response>> toWrite =
+        ImmutableList.of(
+            KV.of(new Request("a"), new Response("a")),
+            KV.of(new Request("b"), new Response("b")),
+            KV.of(new Request("c"), new Response("c")));
+    List<Request> toRead = ImmutableList.of(new Request("d"), new 
Request("e"), new Request("f"));
+    List<KV<Request, Response>> expected =
+        toRead.stream()
+            .<KV<Request, Response>>map(request -> KV.of(request, null))
+            .collect(Collectors.toList());
+    writeThenReadThenPAssert(toWrite, toRead, expected);
+  }
+
+  private void writeThenReadThenPAssert(
+      List<KV<Request, Response>> toWrite,
+      List<Request> toRead,
+      List<KV<Request, Response>> expected)
+      throws NonDeterministicException {
+    PCollection<KV<Request, Response>> toWritePCol = 
writePipeline.apply(Create.of(toWrite));
+    toWritePCol.apply(
+        Cache.writeUsingRedis(
+            Duration.standardHours(1L),
+            externalClients.getActualClient(),
+            CallTest.DETERMINISTIC_REQUEST_CODER,
+            CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+    PCollection<Request> requests =
+        
readPipeline.apply(Create.of(toRead)).setCoder(CallTest.DETERMINISTIC_REQUEST_CODER);
+
+    Call.Result<KV<Request, Response>> gotKVsResult =
+        requests.apply(
+            Cache.readUsingRedis(
+                externalClients.getActualClient(),
+                CallTest.DETERMINISTIC_REQUEST_CODER,
+                CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+    PAssert.that(gotKVsResult.getFailures()).empty();
+    PAssert.that(gotKVsResult.getResponses()).containsInAnyOrder(expected);
+
+    writePipeline.run().waitUntilFinish();
+    readPipeline.run();
+  }
+}
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheTest.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheTest.java
new file mode 100644
index 00000000000..fcb7862e991
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertThrows;
+
+import java.net.URI;
+import org.apache.beam.io.requestresponse.CallTest.Request;
+import org.apache.beam.io.requestresponse.CallTest.Response;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Cache}. */
+@RunWith(JUnit4.class)
+public class CacheTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void givenNonDeterministicCoder_readUsingRedis_throwsError()
+      throws Coder.NonDeterministicException {
+    URI uri = URI.create("redis://localhost:6379");
+    assertThrows(
+        NonDeterministicException.class,
+        () ->
+            Cache.readUsingRedis(
+                new RedisClient(uri),
+                CallTest.NON_DETERMINISTIC_REQUEST_CODER,
+                CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+    assertThrows(
+        NonDeterministicException.class,
+        () ->
+            Cache.readUsingRedis(
+                new RedisClient(uri),
+                CallTest.DETERMINISTIC_REQUEST_CODER,
+                CallTest.NON_DETERMINISTIC_RESPONSE_CODER));
+
+    Cache.readUsingRedis(
+        new RedisClient(uri),
+        CallTest.DETERMINISTIC_REQUEST_CODER,
+        CallTest.DETERMINISTIC_RESPONSE_CODER);
+  }
+
+  @Test
+  public void givenNonDeterministicCoder_writeUsingRedis_throwsError()
+      throws Coder.NonDeterministicException {
+    URI uri = URI.create("redis://localhost:6379");
+    Duration expiry = Duration.standardSeconds(1L);
+    assertThrows(
+        NonDeterministicException.class,
+        () ->
+            Cache.writeUsingRedis(
+                expiry,
+                new RedisClient(uri),
+                CallTest.NON_DETERMINISTIC_REQUEST_CODER,
+                CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+    assertThrows(
+        NonDeterministicException.class,
+        () ->
+            Cache.writeUsingRedis(
+                expiry,
+                new RedisClient(uri),
+                CallTest.DETERMINISTIC_REQUEST_CODER,
+                CallTest.NON_DETERMINISTIC_RESPONSE_CODER));
+
+    Cache.writeUsingRedis(
+        expiry,
+        new RedisClient(uri),
+        CallTest.DETERMINISTIC_REQUEST_CODER,
+        CallTest.DETERMINISTIC_RESPONSE_CODER);
+  }
+
+  @Test
+  public void givenWrongRedisURI_throwsError() throws 
NonDeterministicException {
+    URI uri = URI.create("redis://1.2.3.4:6379");
+    Duration expiry = Duration.standardSeconds(1L);
+    PCollection<Request> requests =
+        pipeline
+            .apply("create requests", Create.of(new Request("")))
+            .setCoder(CallTest.DETERMINISTIC_REQUEST_CODER);
+    requests.apply(
+        "readUsingRedis",
+        Cache.readUsingRedis(
+            new RedisClient(uri),
+            CallTest.DETERMINISTIC_REQUEST_CODER,
+            CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+    PCollection<KV<Request, Response>> kvs =
+        pipeline.apply("create kvs", Create.of(KV.of(new Request(""), new 
Response(""))));
+    kvs.apply(
+        "writeUsingRedis",
+        Cache.writeUsingRedis(
+            expiry,
+            new RedisClient(uri),
+            CallTest.DETERMINISTIC_REQUEST_CODER,
+            CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+    UncheckedExecutionException error =
+        assertThrows(UncheckedExecutionException.class, pipeline::run);
+    assertThat(
+        error.getCause().getMessage(),
+        containsString("Failed to connect to host: redis://1.2.3.4:6379"));
+  }
+}
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java
index 18574b00978..1566d172529 100644
--- 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java
@@ -22,9 +22,17 @@ import static 
org.apache.beam.sdk.values.TypeDescriptors.strings;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import org.apache.beam.io.requestresponse.Call.Result;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -36,6 +44,8 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
 import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.jetbrains.annotations.NotNull;
 import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
@@ -47,13 +57,23 @@ import org.junit.runners.JUnit4;
 public class CallTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final SerializableCoder<@NonNull Response> RESPONSE_CODER =
+  static final SerializableCoder<@NonNull Request> 
NON_DETERMINISTIC_REQUEST_CODER =
+      SerializableCoder.of(Request.class);
+
+  static final Coder<@NonNull Request> DETERMINISTIC_REQUEST_CODER =
+      new DeterministicRequestCoder();
+
+  static final SerializableCoder<@NonNull Response> 
NON_DETERMINISTIC_RESPONSE_CODER =
       SerializableCoder.of(Response.class);
 
+  static final Coder<@NonNull Response> DETERMINISTIC_RESPONSE_CODER =
+      new DeterministicResponseCoder();
+
   @Test
   public void givenCallerNotSerializable_throwsError() {
     assertThrows(
-        IllegalArgumentException.class, () -> Call.of(new 
UnSerializableCaller(), RESPONSE_CODER));
+        IllegalArgumentException.class,
+        () -> Call.of(new UnSerializableCaller(), 
NON_DETERMINISTIC_RESPONSE_CODER));
   }
 
   @Test
@@ -62,7 +82,7 @@ public class CallTest {
         IllegalArgumentException.class,
         () ->
             Call.ofCallerAndSetupTeardown(
-                new UnSerializableCallerWithSetupTeardown(), RESPONSE_CODER));
+                new UnSerializableCallerWithSetupTeardown(), 
NON_DETERMINISTIC_RESPONSE_CODER));
   }
 
   @Test
@@ -70,7 +90,10 @@ public class CallTest {
     Result<Response> result =
         pipeline
             .apply(Create.of(new Request("a")))
-            .apply(Call.of(new CallerThrowsUserCodeExecutionException(), 
RESPONSE_CODER));
+            .apply(
+                Call.of(
+                    new CallerThrowsUserCodeExecutionException(),
+                    NON_DETERMINISTIC_RESPONSE_CODER));
 
     PCollection<ApiIOError> failures = result.getFailures();
     PAssert.thatSingleton(countStackTracesOf(failures, 
UserCodeExecutionException.class))
@@ -87,7 +110,7 @@ public class CallTest {
     Result<Response> result =
         pipeline
             .apply(Create.of(new Request("a")))
-            .apply(Call.of(new CallerInvokesQuotaException(), RESPONSE_CODER));
+            .apply(Call.of(new CallerInvokesQuotaException(), 
NON_DETERMINISTIC_RESPONSE_CODER));
 
     PCollection<ApiIOError> failures = result.getFailures();
     PAssert.thatSingleton(countStackTracesOf(failures, 
UserCodeExecutionException.class))
@@ -105,7 +128,9 @@ public class CallTest {
     Result<Response> result =
         pipeline
             .apply(Create.of(new Request("a")))
-            .apply(Call.of(new CallerExceedsTimeout(timeout), 
RESPONSE_CODER).withTimeout(timeout));
+            .apply(
+                Call.of(new CallerExceedsTimeout(timeout), 
NON_DETERMINISTIC_RESPONSE_CODER)
+                    .withTimeout(timeout));
 
     PCollection<ApiIOError> failures = result.getFailures();
     PAssert.thatSingleton(countStackTracesOf(failures, 
UserCodeExecutionException.class))
@@ -122,7 +147,7 @@ public class CallTest {
     Result<Response> result =
         pipeline
             .apply(Create.of(new Request("a")))
-            .apply(Call.of(new CallerThrowsTimeout(), RESPONSE_CODER));
+            .apply(Call.of(new CallerThrowsTimeout(), 
NON_DETERMINISTIC_RESPONSE_CODER));
 
     PCollection<ApiIOError> failures = result.getFailures();
     PAssert.thatSingleton(countStackTracesOf(failures, 
UserCodeExecutionException.class))
@@ -139,7 +164,7 @@ public class CallTest {
     pipeline
         .apply(Create.of(new Request("")))
         .apply(
-            Call.of(new ValidCaller(), RESPONSE_CODER)
+            Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
                 .withSetupTeardown(new 
SetupThrowsUserCodeExecutionException()));
 
     assertPipelineThrows(UserCodeExecutionException.class, pipeline);
@@ -150,7 +175,7 @@ public class CallTest {
     pipeline
         .apply(Create.of(new Request("")))
         .apply(
-            Call.of(new ValidCaller(), RESPONSE_CODER)
+            Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
                 .withSetupTeardown(new SetupThrowsUserCodeQuotaException()));
 
     assertPipelineThrows(UserCodeQuotaException.class, pipeline);
@@ -163,7 +188,7 @@ public class CallTest {
     pipeline
         .apply(Create.of(new Request("")))
         .apply(
-            Call.of(new ValidCaller(), RESPONSE_CODER)
+            Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
                 .withSetupTeardown(new SetupExceedsTimeout(timeout))
                 .withTimeout(timeout));
 
@@ -175,7 +200,7 @@ public class CallTest {
     pipeline
         .apply(Create.of(new Request("")))
         .apply(
-            Call.of(new ValidCaller(), RESPONSE_CODER)
+            Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
                 .withSetupTeardown(new SetupThrowsUserCodeTimeoutException()));
 
     assertPipelineThrows(UserCodeTimeoutException.class, pipeline);
@@ -186,7 +211,7 @@ public class CallTest {
     pipeline
         .apply(Create.of(new Request("")))
         .apply(
-            Call.of(new ValidCaller(), RESPONSE_CODER)
+            Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
                 .withSetupTeardown(new 
TeardownThrowsUserCodeExecutionException()));
 
     // Exceptions thrown during teardown do not populate with the cause
@@ -198,7 +223,7 @@ public class CallTest {
     pipeline
         .apply(Create.of(new Request("")))
         .apply(
-            Call.of(new ValidCaller(), RESPONSE_CODER)
+            Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
                 .withSetupTeardown(new 
TeardownThrowsUserCodeQuotaException()));
 
     // Exceptions thrown during teardown do not populate with the cause
@@ -211,7 +236,7 @@ public class CallTest {
     pipeline
         .apply(Create.of(new Request("")))
         .apply(
-            Call.of(new ValidCaller(), RESPONSE_CODER)
+            Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
                 .withTimeout(timeout)
                 .withSetupTeardown(new TeardownExceedsTimeout(timeout)));
 
@@ -224,7 +249,7 @@ public class CallTest {
     pipeline
         .apply(Create.of(new Request("")))
         .apply(
-            Call.of(new ValidCaller(), RESPONSE_CODER)
+            Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
                 .withSetupTeardown(new 
TeardownThrowsUserCodeTimeoutException()));
 
     // Exceptions thrown during teardown do not populate with the cause
@@ -236,7 +261,7 @@ public class CallTest {
     Result<Response> result =
         pipeline
             .apply(Create.of(new Request("a")))
-            .apply(Call.of(new ValidCaller(), RESPONSE_CODER));
+            .apply(Call.of(new ValidCaller(), 
NON_DETERMINISTIC_RESPONSE_CODER));
 
     
PAssert.thatSingleton(result.getFailures().apply(Count.globally())).isEqualTo(0L);
     PAssert.that(result.getResponses()).containsInAnyOrder(new Response("a"));
@@ -275,7 +300,7 @@ public class CallTest {
 
   private static class UnSerializable {}
 
-  private static class Request implements Serializable {
+  static class Request implements Serializable {
 
     final String id;
 
@@ -305,7 +330,7 @@ public class CallTest {
     }
   }
 
-  private static class Response implements Serializable {
+  static class Response implements Serializable {
     final String id;
 
     Response(String id) {
@@ -490,4 +515,55 @@ public class CallTest {
     } catch (InterruptedException ignored) {
     }
   }
+
+  private static class DeterministicRequestCoder extends CustomCoder<@NonNull 
Request> {
+    private static final Coder<String> ID_CODER = StringUtf8Coder.of();
+
+    @Override
+    public void encode(Request value, @NotNull OutputStream outStream)
+        throws CoderException, IOException {
+      ID_CODER.encode(checkStateNotNull(value).id, outStream);
+    }
+
+    @Override
+    public @NonNull Request decode(@NotNull InputStream inStream)
+        throws CoderException, IOException {
+      String id = ID_CODER.decode(inStream);
+      return new Request(id);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      ID_CODER.verifyDeterministic();
+    }
+  }
+
+  private static class DeterministicResponseCoder extends 
CustomCoder<Response> {
+    private static final NullableCoder<String> ID_CODER = 
NullableCoder.of(StringUtf8Coder.of());
+
+    @Override
+    public void encode(@Nullable Response value, @NotNull OutputStream 
outStream)
+        throws CoderException, IOException {
+      if (value == null) {
+        ID_CODER.encode(null, outStream);
+        return;
+      }
+      ID_CODER.encode(checkStateNotNull(value).id, outStream);
+    }
+
+    @Override
+    public Response decode(@NotNull InputStream inStream) throws 
CoderException, IOException {
+      try {
+        String id = ID_CODER.decode(inStream);
+        return new Response(id);
+      } catch (CoderException ignored) {
+        return null;
+      }
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      ID_CODER.verifyDeterministic();
+    }
+  }
 }
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownTestIT.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java
similarity index 91%
rename from 
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownTestIT.java
rename to 
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java
index 14b6e9e6433..c10b7ee1609 100644
--- 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownTestIT.java
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.io.requestresponse;
 
+import static 
org.apache.beam.io.requestresponse.EchoITOptions.GRPC_ENDPOINT_ADDRESS_NAME;
 import static 
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static org.junit.Assert.assertEquals;
@@ -27,6 +28,7 @@ import java.net.URI;
 import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
 import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse;
 import org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.junit.AfterClass;
@@ -41,7 +43,7 @@ import org.junit.runners.JUnit4;
  * running integration tests.
  */
 @RunWith(JUnit4.class)
-public class EchoGRPCCallerWithSetupTeardownTestIT {
+public class EchoGRPCCallerWithSetupTeardownIT {
 
   private static @MonotonicNonNull EchoITOptions options;
   private static @MonotonicNonNull EchoGRPCCallerWithSetupTeardown client;
@@ -50,11 +52,15 @@ public class EchoGRPCCallerWithSetupTeardownTestIT {
   @BeforeClass
   public static void setUp() throws UserCodeExecutionException {
     options = readIOTestPipelineOptions(EchoITOptions.class);
-    if (options.getgRPCEndpointAddress().isEmpty()) {
+    if (Strings.isNullOrEmpty(options.getGrpcEndpointAddress())) {
       throw new RuntimeException(
-          "--gRPCEndpointAddress is missing. See " + EchoITOptions.class + 
"for details.");
+          "--"
+              + GRPC_ENDPOINT_ADDRESS_NAME
+              + " is missing. See "
+              + EchoITOptions.class
+              + "for details.");
     }
-    client = 
EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getgRPCEndpointAddress()));
+    client = 
EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress()));
     checkStateNotNull(client).setup();
 
     EchoRequest request = createShouldExceedQuotaRequest();
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerTestIT.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerIT.java
similarity index 87%
rename from 
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerTestIT.java
rename to 
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerIT.java
index fa0cb937811..10b92b2610d 100644
--- 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerTestIT.java
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.io.requestresponse;
 
+import static 
org.apache.beam.io.requestresponse.EchoITOptions.HTTP_ENDPOINT_ADDRESS_NAME;
 import static 
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static org.junit.Assert.assertEquals;
@@ -28,6 +29,7 @@ import java.net.URI;
 import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
 import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse;
 import org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.junit.BeforeClass;
@@ -36,12 +38,12 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests for {@link EchoHTTPCallerTestIT} on a deployed {@link 
EchoServiceGrpc} instance's HTTP
- * handler. See {@link EchoITOptions} for details on the required parameters 
and how to provide
- * these for running integration tests.
+ * Tests for {@link EchoHTTPCallerIT} on a deployed {@link EchoServiceGrpc} 
instance's HTTP handler.
+ * See {@link EchoITOptions} for details on the required parameters and how to 
provide these for
+ * running integration tests.
  */
 @RunWith(JUnit4.class)
-public class EchoHTTPCallerTestIT {
+public class EchoHTTPCallerIT {
 
   private static @MonotonicNonNull EchoITOptions options;
   private static @MonotonicNonNull EchoHTTPCaller client;
@@ -50,9 +52,13 @@ public class EchoHTTPCallerTestIT {
   @BeforeClass
   public static void setUp() throws UserCodeExecutionException {
     options = readIOTestPipelineOptions(EchoITOptions.class);
-    if (options.getHttpEndpointAddress().isEmpty()) {
+    if (Strings.isNullOrEmpty(options.getHttpEndpointAddress())) {
       throw new RuntimeException(
-          "--httpEndpointAddress is missing. See " + EchoITOptions.class + 
"for details.");
+          "--"
+              + HTTP_ENDPOINT_ADDRESS_NAME
+              + " is missing. See "
+              + EchoITOptions.class
+              + "for details.");
     }
     client = EchoHTTPCaller.of(URI.create(options.getHttpEndpointAddress()));
 
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java
index a32f7a78e82..dabec750892 100644
--- 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java
@@ -37,10 +37,13 @@ import 
org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc;
  * </pre>
  */
 public interface EchoITOptions extends PipelineOptions {
+  String GRPC_ENDPOINT_ADDRESS_NAME = "grpcEndpointAddress";
+  String HTTP_ENDPOINT_ADDRESS_NAME = "httpEndpointAddress";
+
   @Description("The gRPC address of the Echo API endpoint, typically of the 
form <host>:<port>.")
-  String getgRPCEndpointAddress();
+  String getGrpcEndpointAddress();
 
-  void setgRPCEndpointAddress(String value);
+  void setGrpcEndpointAddress(String value);
 
   @Description("The HTTP address of the Echo API endpoint; must being with 
http(s)://")
   String getHttpEndpointAddress();
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java
new file mode 100644
index 00000000000..75cd49904cf
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+class EchoRequestCoder extends CustomCoder<@NonNull EchoRequest> {
+
+  @Override
+  public void encode(@NonNull EchoRequest value, @NonNull OutputStream 
outStream)
+      throws CoderException, IOException {
+    value.writeTo(outStream);
+  }
+
+  @Override
+  public @NonNull EchoRequest decode(@NonNull InputStream inStream)
+      throws CoderException, IOException {
+    return EchoRequest.parseFrom(inStream);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {}
+}
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientTestIT.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java
similarity index 89%
rename from 
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientTestIT.java
rename to 
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java
index 1fbb320a5f2..939515836bf 100644
--- 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientTestIT.java
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java
@@ -20,6 +20,8 @@ package org.apache.beam.io.requestresponse;
 import static 
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypes;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
@@ -48,7 +50,7 @@ import org.testcontainers.utility.DockerImageName;
 
 /** Integration tests for {@link RedisClient}. */
 @RunWith(JUnit4.class)
-public class RedisClientTestIT {
+public class RedisClientIT {
 
   private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine";
   private static final Integer PORT = 6379;
@@ -206,4 +208,24 @@ public class RedisClientTestIT {
   public void givenKeyNotExists_getLong_yieldsZero() throws 
UserCodeExecutionException {
     assertEquals(0L, 
externalClients.getActualClient().getLong(UUID.randomUUID().toString()));
   }
+
+  @Test
+  public void givenKeyNotExists_getBytes_yieldsNull() throws 
UserCodeExecutionException {
+    assertNull(
+        externalClients
+            .getActualClient()
+            
.getBytes(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)));
+  }
+
+  @Test
+  public void givenKeyExists_getBytes_yieldsValue() throws 
UserCodeExecutionException {
+    byte[] keyBytes = 
UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
+    String expected = UUID.randomUUID().toString();
+    byte[] expectedBytes = expected.getBytes(StandardCharsets.UTF_8);
+    externalClients.getValidatingClient().set(keyBytes, expectedBytes);
+    byte[] actualBytes = externalClients.getActualClient().getBytes(keyBytes);
+    assertNotNull(actualBytes);
+    String actual = new String(actualBytes, StandardCharsets.UTF_8);
+    assertEquals(expected, actual);
+  }
 }
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java
new file mode 100644
index 00000000000..24db38f926e
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static 
org.apache.beam.io.requestresponse.EchoITOptions.GRPC_ENDPOINT_ADDRESS_NAME;
+import static 
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+
+import com.google.protobuf.ByteString;
+import java.net.URI;
+import java.util.UUID;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
+import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Integration tests for {@link ThrottleWithExternalResource}. See {@link 
EchoITOptions} for details
+ * on the required parameters and how to provide these for running integration 
tests.
+ */
+public class ThrottleWithExternalResourceIT {
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  private static final String QUOTA_ID = 
"echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota";
+  private static final Quota QUOTA = new Quota(1L, 
Duration.standardSeconds(1L));
+  private static final ByteString PAYLOAD = ByteString.copyFromUtf8("payload");
+  private static @MonotonicNonNull EchoITOptions options;
+  private static @MonotonicNonNull EchoGRPCCallerWithSetupTeardown client;
+  private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine";
+  private static final Integer PORT = 6379;
+  private static final EchoRequestCoder REQUEST_CODER = new EchoRequestCoder();
+  private static final Coder<EchoResponse> RESPONSE_CODER =
+      SerializableCoder.of(TypeDescriptor.of(EchoResponse.class));
+
+  @Rule
+  public GenericContainer<?> redis =
+      new 
GenericContainer<>(DockerImageName.parse(CONTAINER_IMAGE_NAME)).withExposedPorts(PORT);
+
+  @BeforeClass
+  public static void setUp() throws UserCodeExecutionException {
+    options = readIOTestPipelineOptions(EchoITOptions.class);
+    if (Strings.isNullOrEmpty(options.getGrpcEndpointAddress())) {
+      throw new RuntimeException(
+          "--"
+              + GRPC_ENDPOINT_ADDRESS_NAME
+              + " is missing. See "
+              + EchoITOptions.class
+              + "for details.");
+    }
+    client = 
EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress()));
+    checkStateNotNull(client).setup();
+
+    try {
+      client.call(createRequest());
+    } catch (UserCodeExecutionException e) {
+      if (e instanceof UserCodeQuotaException) {
+        throw new RuntimeException(
+            String.format(
+                "The quota: %s is set to refresh on an interval. Unless there 
are failures in this test, wait for a few seconds before running the test 
again.",
+                QUOTA_ID),
+            e);
+      }
+      throw e;
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws UserCodeExecutionException {
+    checkStateNotNull(client).teardown();
+  }
+
+  @Test
+  public void givenThrottleUsingRedis_preventsQuotaErrors() throws 
NonDeterministicException {
+    URI uri =
+        URI.create(String.format("redis://%s:%d", redis.getHost(), 
redis.getFirstMappedPort()));
+    pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
+
+    Call.Result<EchoRequest> throttleResult =
+        createRequestStream()
+            .apply(
+                "throttle",
+                ThrottleWithExternalResource.usingRedis(
+                    uri, QUOTA_ID, UUID.randomUUID().toString(), QUOTA, 
REQUEST_CODER));
+
+    // Assert that we are not getting any errors and successfully emitting 
throttled elements.
+    PAssert.that(throttleResult.getFailures()).empty();
+    PAssert.thatSingleton(
+            throttleResult
+                .getResponses()
+                .apply(
+                    "window throttled", 
Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
+                .apply(
+                    "count throttled",
+                    
Combine.globally(Count.<EchoRequest>combineFn()).withoutDefaults()))
+        .notEqualTo(0L);
+
+    // Assert that all the throttled data is not corrupted.
+    PAssert.that(
+            throttleResult
+                .getResponses()
+                .apply(
+                    "window throttled before extraction",
+                    Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
+                .apply(
+                    "extract request id",
+                    MapElements.into(strings()).via(input -> 
checkStateNotNull(input).getId()))
+                .apply("distinct", Distinct.create()))
+        .containsInAnyOrder(QUOTA_ID);
+
+    // Call the Echo service with throttled requests.
+    Call.Result<EchoResponse> echoResult =
+        throttleResult
+            .getResponses()
+            .apply("call", Call.ofCallerAndSetupTeardown(client, 
RESPONSE_CODER));
+
+    // Assert that there were no errors.
+    PAssert.that(echoResult.getFailures()).empty();
+
+    // Assert that the responses match the requests.
+    PAssert.that(
+            echoResult
+                .getResponses()
+                .apply(
+                    "window responses before extraction",
+                    Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
+                .apply(
+                    "extract response id",
+                    MapElements.into(strings()).via(input -> 
checkStateNotNull(input).getId())))
+        .containsInAnyOrder(QUOTA_ID);
+
+    PipelineResult job = pipeline.run();
+    job.waitUntilFinish(Duration.standardSeconds(3L));
+  }
+
+  private static EchoRequest createRequest() {
+    return 
EchoRequest.newBuilder().setId(QUOTA_ID).setPayload(PAYLOAD).build();
+  }
+
+  private PCollection<EchoRequest> createRequestStream() {
+    return pipeline
+        .apply("impulse", 
PeriodicImpulse.create().withInterval(Duration.millis(10L)))
+        .apply(
+            "requests",
+            MapElements.into(TypeDescriptor.of(EchoRequest.class)).via(ignored 
-> createRequest()));
+  }
+}
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java
new file mode 100644
index 00000000000..591ba923201
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertThrows;
+
+import java.net.URI;
+import org.apache.beam.io.requestresponse.CallTest.Request;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ThrottleWithExternalResource}. */
+@RunWith(JUnit4.class)
+public class ThrottleWithExternalResourceTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void givenNonDeterministicCoder_usingRedis_throwsError() throws 
NonDeterministicException {
+    URI uri = URI.create("redis://localhost:6379");
+    String quotaIdentifier = "quota";
+    String queueKey = "queue";
+    Quota quota = new Quota(10L, Duration.standardSeconds(1L));
+
+    assertThrows(
+        NonDeterministicException.class,
+        () ->
+            ThrottleWithExternalResource.usingRedis(
+                uri, quotaIdentifier, queueKey, quota, 
CallTest.NON_DETERMINISTIC_REQUEST_CODER));
+
+    ThrottleWithExternalResource.usingRedis(
+        uri, quotaIdentifier, queueKey, quota, 
CallTest.DETERMINISTIC_REQUEST_CODER);
+  }
+
+  @Test
+  public void givenWrongRedisURI_throwsError() throws 
NonDeterministicException {
+    URI uri = URI.create("redis://1.2.3.4:6379");
+    String quotaIdentifier = "quota";
+    String queueKey = "queue";
+    Quota quota = new Quota(10L, Duration.standardSeconds(1L));
+    PCollection<Request> requests =
+        pipeline.apply(Create.of(new 
Request(""))).setCoder(CallTest.DETERMINISTIC_REQUEST_CODER);
+    requests.apply(
+        ThrottleWithExternalResource.usingRedis(
+            uri, quotaIdentifier, queueKey, quota, requests.getCoder()));
+
+    UncheckedExecutionException error =
+        assertThrows(UncheckedExecutionException.class, pipeline::run);
+    assertThat(
+        error.getCause().getMessage(),
+        containsString("Failed to connect to host: redis://1.2.3.4:6379"));
+  }
+}

Reply via email to