This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1e06d882bdd [RRIO] [Throttle] [Cache] Implement Throttle and Cache
using an external resource. (#29401)
1e06d882bdd is described below
commit 1e06d882bddfa2eae33a6bcfc462fe10172704bb
Author: Damon <[email protected]>
AuthorDate: Wed Nov 22 11:12:58 2023 -0800
[RRIO] [Throttle] [Cache] Implement Throttle and Cache using an external
resource. (#29401)
* WIP: Implement CacheSerializer and providers
* wip
* Condense Throttle into one class
* wip
* Implement Throttle and Cache
* Update javadoc
* Edit per PR comments
* Refacter per PR comments
---
.../configmap.yaml | 30 ++
.../deployment.yaml | 27 ++
.../kustomization.yaml | 34 ++
sdks/java/io/rrio/build.gradle | 1 +
.../org/apache/beam/io/requestresponse/Cache.java | 239 ++++++++++++
.../apache/beam/io/requestresponse/CacheRead.java | 121 ------
.../apache/beam/io/requestresponse/CacheWrite.java | 119 ------
.../org/apache/beam/io/requestresponse/Call.java | 39 +-
.../org/apache/beam/io/requestresponse/Quota.java | 70 ++++
.../beam/io/requestresponse/RedisClient.java | 10 +
.../beam/io/requestresponse/ThrottleDequeue.java | 101 -----
.../beam/io/requestresponse/ThrottleEnqueue.java | 61 ---
.../io/requestresponse/ThrottleRefreshQuota.java | 55 ---
.../ThrottleWithExternalResource.java | 418 +++++++++++++++++++++
.../apache/beam/io/requestresponse/CacheIT.java | 120 ++++++
.../apache/beam/io/requestresponse/CacheTest.java | 132 +++++++
.../apache/beam/io/requestresponse/CallTest.java | 112 +++++-
...java => EchoGRPCCallerWithSetupTeardownIT.java} | 14 +-
...HTTPCallerTestIT.java => EchoHTTPCallerIT.java} | 18 +-
.../beam/io/requestresponse/EchoITOptions.java | 7 +-
.../beam/io/requestresponse/EchoRequestCoder.java | 44 +++
.../{RedisClientTestIT.java => RedisClientIT.java} | 24 +-
.../ThrottleWithExternalResourceIT.java | 186 +++++++++
.../ThrottleWithExternalResourceTest.java | 77 ++++
24 files changed, 1559 insertions(+), 500 deletions(-)
diff --git
a/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/configmap.yaml
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/configmap.yaml
new file mode 100644
index 00000000000..6a482b21b16
--- /dev/null
+++
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/configmap.yaml
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Configures patch for ../base/configmap.yaml
+# See
https://kubectl.docs.kubernetes.io/references/kustomize/kustomization/patches/
+
+- op: replace
+ path: /metadata/labels/quota-id
+ value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
+- op: replace
+ path: /data/QUOTA_ID
+ value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
+- op: replace
+ path: /data/QUOTA_SIZE
+ value: "10"
+- op: replace
+ path: /data/QUOTA_REFRESH_INTERVAL
+ value: 1s
diff --git
a/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/deployment.yaml
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/deployment.yaml
new file mode 100644
index 00000000000..cff2f994cd6
--- /dev/null
+++
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/deployment.yaml
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Configures patch for ../base/deployment.yaml
+# See
https://kubectl.docs.kubernetes.io/references/kustomize/kustomization/patches/
+
+- op: replace
+ path: /metadata/labels/quota-id
+ value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
+- op: replace
+ path: /spec/selector/matchLabels/quota-id
+ value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
+- op: replace
+ path: /spec/template/metadata/labels/quota-id
+ value: echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
diff --git
a/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/kustomization.yaml
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/kustomization.yaml
new file mode 100644
index 00000000000..d10598be51f
--- /dev/null
+++
b/.test-infra/mock-apis/infrastructure/kubernetes/refresher/overlays/echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota/kustomization.yaml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Configures the overlay for
.test-infra/mock-apis/infrastructure/kubernetes/refresher/base
+# Using the Quota Id:
+# echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota
+
+resources:
+- ../../base
+
+nameSuffix: -throttle-with-external-resource-test-10-per-1s
+
+patches:
+- path: configmap.yaml
+ target:
+ kind: ConfigMap
+ name: refresher
+
+- path: deployment.yaml
+ target:
+ kind: Deployment
+ name: refresher
diff --git a/sdks/java/io/rrio/build.gradle b/sdks/java/io/rrio/build.gradle
index bfd030ce61d..4ecdf4e91df 100644
--- a/sdks/java/io/rrio/build.gradle
+++ b/sdks/java/io/rrio/build.gradle
@@ -50,6 +50,7 @@ dependencies {
testImplementation
platform(library.java.google_cloud_platform_libraries_bom)
testImplementation library.java.google_http_client
testImplementation library.java.junit
+ testImplementation library.java.hamcrest
testImplementation library.java.testcontainers_base
testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java
new file mode 100644
index 00000000000..b8e526c4829
--- /dev/null
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Cache.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/** Transforms for reading and writing request/response associations to a
cache. */
+final class Cache {
+
+ /**
+ * Instantiates a {@link Call} {@link PTransform} that reads {@link
RequestT} {@link ResponseT}
+ * associations from a cache. The {@link KV} value is null when no
association exists. This method
+ * does not enforce {@link Coder#verifyDeterministic} and defers to the user
to determine whether
+ * to enforce this given the cache implementation.
+ */
+ static <
+ RequestT,
+ @Nullable ResponseT,
+ CallerSetupTeardownT extends
+ Caller<RequestT, KV<RequestT, @Nullable ResponseT>> &
SetupTeardown>
+ PTransform<PCollection<RequestT>, Call.Result<KV<RequestT, @Nullable
ResponseT>>> read(
+ CallerSetupTeardownT implementsCallerSetupTeardown,
+ Coder<RequestT> requestTCoder,
+ Coder<@Nullable ResponseT> responseTCoder) {
+ return Call.ofCallerAndSetupTeardown(
+ implementsCallerSetupTeardown, KvCoder.of(requestTCoder,
responseTCoder));
+ }
+
+ /**
+ * Instantiates a {@link Call} {@link PTransform}, calling {@link #read}
with a {@link Caller}
+ * that employs a redis client.
+ *
+ * <p>This method requires both the {@link RequestT} and {@link ResponseT}s'
{@link
+ * Coder#verifyDeterministic}. Otherwise, it throws a {@link
NonDeterministicException}.
+ *
+ * <p><a href="https://redis.io">Redis</a> is designed for multiple
workloads, simultaneously
+ * reading and writing to a shared instance. See <a
+ * href="https://redis.io/docs/get-started/faq/">Redis FAQ</a> for more
information on important
+ * considerations when using this method to achieve cache reads.
+ */
+ static <RequestT, @Nullable ResponseT>
+ PTransform<PCollection<RequestT>, Call.Result<KV<RequestT, @Nullable
ResponseT>>>
+ readUsingRedis(
+ RedisClient client,
+ Coder<RequestT> requestTCoder,
+ Coder<@Nullable ResponseT> responseTCoder)
+ throws NonDeterministicException {
+ return read(
+ new UsingRedis<>(requestTCoder, responseTCoder, client).read(),
+ requestTCoder,
+ responseTCoder);
+ }
+
+ /**
+ * Write a {@link RequestT} {@link ResponseT} association to a cache. This
method does not enforce
+ * {@link Coder#verifyDeterministic} and defers to the user to determine
whether to enforce this
+ * given the cache implementation.
+ */
+ static <
+ RequestT,
+ ResponseT,
+ CallerSetupTeardownT extends
+ Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>> &
SetupTeardown>
+ PTransform<PCollection<KV<RequestT, ResponseT>>,
Call.Result<KV<RequestT, ResponseT>>> write(
+ CallerSetupTeardownT implementsCallerSetupTeardown,
+ KvCoder<RequestT, ResponseT> kvCoder) {
+ return Call.ofCallerAndSetupTeardown(implementsCallerSetupTeardown,
kvCoder);
+ }
+
+ /**
+ * Instantiates a {@link Call} {@link PTransform}, calling {@link #write}
with a {@link Caller}
+ * that employs a redis client.
+ *
+ * <p>This method requires both the {@link RequestT} and {@link ResponseT}s'
{@link
+ * Coder#verifyDeterministic}. Otherwise, it throws a {@link
NonDeterministicException}.
+ *
+ * <p><a href="https://redis.io">Redis</a> is designed for multiple
workloads, simultaneously
+ * reading and writing to a shared instance. See <a
+ * href="https://redis.io/docs/get-started/faq/">Redis FAQ</a> for more
information on important
+ * considerations when using this method to achieve cache writes.
+ */
+ static <RequestT, ResponseT>
+ PTransform<PCollection<KV<RequestT, ResponseT>>,
Call.Result<KV<RequestT, ResponseT>>>
+ writeUsingRedis(
+ Duration expiry,
+ RedisClient client,
+ Coder<RequestT> requestTCoder,
+ Coder<@Nullable ResponseT> responseTCoder)
+ throws NonDeterministicException {
+ return write(
+ new UsingRedis<>(requestTCoder, responseTCoder, client).write(expiry),
+ KvCoder.of(requestTCoder, responseTCoder));
+ }
+
+ private static class UsingRedis<RequestT, ResponseT> {
+ private final Coder<RequestT> requestTCoder;
+ private final Coder<@Nullable ResponseT> responseTCoder;
+ private final RedisClient client;
+
+ private UsingRedis(
+ Coder<RequestT> requestTCoder,
+ Coder<@Nullable ResponseT> responseTCoder,
+ RedisClient client)
+ throws Coder.NonDeterministicException {
+ this.client = client;
+ requestTCoder.verifyDeterministic();
+ responseTCoder.verifyDeterministic();
+ this.requestTCoder = requestTCoder;
+ this.responseTCoder = responseTCoder;
+ }
+
+ private Read<RequestT, @Nullable ResponseT> read() {
+ return new Read<>(requestTCoder, responseTCoder, client);
+ }
+
+ private Write<RequestT, ResponseT> write(Duration expiry) {
+ return new Write<>(expiry, requestTCoder, responseTCoder, client);
+ }
+
+ /** Reads associated {@link RequestT} {@link ResponseT} using a {@link
RedisClient}. */
+ private static class Read<RequestT, @Nullable ResponseT>
+ implements Caller<RequestT, KV<RequestT, @Nullable ResponseT>>,
SetupTeardown {
+
+ private final Coder<RequestT> requestTCoder;
+ private final Coder<@Nullable ResponseT> responseTCoder;
+ private final RedisClient client;
+
+ private Read(
+ Coder<RequestT> requestTCoder,
+ Coder<@Nullable ResponseT> responseTCoder,
+ RedisClient client) {
+ this.requestTCoder = requestTCoder;
+ this.responseTCoder = responseTCoder;
+ this.client = client;
+ }
+
+ @Override
+ public KV<RequestT, @Nullable ResponseT> call(RequestT request)
+ throws UserCodeExecutionException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ requestTCoder.encode(request, baos);
+ byte[] encodedRequest = baos.toByteArray();
+ byte[] encodedResponse = client.getBytes(encodedRequest);
+ if (encodedResponse == null) {
+ return KV.of(request, null);
+ }
+ ResponseT response =
+ checkStateNotNull(
+
responseTCoder.decode(ByteSource.wrap(encodedResponse).openStream()));
+ return KV.of(request, response);
+ } catch (IllegalStateException | IOException e) {
+ throw new UserCodeExecutionException(e);
+ }
+ }
+
+ @Override
+ public void setup() throws UserCodeExecutionException {
+ client.setup();
+ }
+
+ @Override
+ public void teardown() throws UserCodeExecutionException {
+ client.teardown();
+ }
+ }
+ }
+
+ private static class Write<RequestT, ResponseT>
+ implements Caller<KV<RequestT, ResponseT>, KV<RequestT, ResponseT>>,
SetupTeardown {
+ private final Duration expiry;
+ private final Coder<RequestT> requestTCoder;
+ private final Coder<@Nullable ResponseT> responseTCoder;
+ private final RedisClient client;
+
+ private Write(
+ Duration expiry,
+ Coder<RequestT> requestTCoder,
+ Coder<@Nullable ResponseT> responseTCoder,
+ RedisClient client) {
+ this.expiry = expiry;
+ this.requestTCoder = requestTCoder;
+ this.responseTCoder = responseTCoder;
+ this.client = client;
+ }
+
+ @Override
+ public KV<RequestT, ResponseT> call(KV<RequestT, ResponseT> request)
+ throws UserCodeExecutionException {
+ ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
+ ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
+ try {
+ requestTCoder.encode(request.getKey(), keyStream);
+ responseTCoder.encode(request.getValue(), valueStream);
+ } catch (IOException e) {
+ throw new UserCodeExecutionException(e);
+ }
+ client.setex(keyStream.toByteArray(), valueStream.toByteArray(), expiry);
+ return request;
+ }
+
+ @Override
+ public void setup() throws UserCodeExecutionException {
+ client.setup();
+ }
+
+ @Override
+ public void teardown() throws UserCodeExecutionException {
+ client.teardown();
+ }
+ }
+}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheRead.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheRead.java
deleted file mode 100644
index 3765d25370a..00000000000
---
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheRead.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import java.util.Map;
-import org.apache.beam.io.requestresponse.CacheRead.Result;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-
-/**
- * {@link CacheRead} reads associated {@link ResponseT} types from {@link
RequestT} types, if any
- * exist.
- */
-class CacheRead<RequestT, ResponseT>
- extends PTransform<PCollection<RequestT>, Result<RequestT, ResponseT>> {
-
- private static final TupleTag<ApiIOError> FAILURE_TAG = new
TupleTag<ApiIOError>() {};
-
- // TODO(damondouglas): remove suppress warnings after instance utilized.
- @SuppressWarnings({"unused"})
- private final Configuration<RequestT, ResponseT> configuration;
-
- private CacheRead(Configuration<RequestT, ResponseT> configuration) {
- this.configuration = configuration;
- }
-
- /** Configuration details for {@link CacheRead}. */
- @AutoValue
- abstract static class Configuration<RequestT, ResponseT> {
-
- static <RequestT, ResponseT> Builder<RequestT, ResponseT> builder() {
- return new AutoValue_CacheRead_Configuration.Builder<>();
- }
-
- abstract Builder<RequestT, ResponseT> toBuilder();
-
- @AutoValue.Builder
- abstract static class Builder<RequestT, ResponseT> {
-
- abstract Configuration<RequestT, ResponseT> build();
- }
- }
-
- @Override
- public Result<RequestT, ResponseT> expand(PCollection<RequestT> input) {
- return Result.of(
- new TupleTag<KV<RequestT, ResponseT>>() {},
PCollectionTuple.empty(input.getPipeline()));
- }
-
- /**
- * The {@link Result} of reading RequestT {@link PCollection} elements
yielding ResponseT {@link
- * PCollection} elements.
- */
- static class Result<RequestT, ResponseT> implements POutput {
-
- static <RequestT, ResponseT> Result<RequestT, ResponseT> of(
- TupleTag<KV<RequestT, ResponseT>> responseTag, PCollectionTuple pct) {
- return new Result<>(responseTag, pct);
- }
-
- private final Pipeline pipeline;
- private final TupleTag<KV<RequestT, ResponseT>> responseTag;
- private final PCollection<KV<RequestT, ResponseT>> responses;
- private final PCollection<ApiIOError> failures;
-
- private Result(TupleTag<KV<RequestT, ResponseT>> responseTag,
PCollectionTuple pct) {
- this.pipeline = pct.getPipeline();
- this.responseTag = responseTag;
- this.responses = pct.get(responseTag);
- this.failures = pct.get(FAILURE_TAG);
- }
-
- PCollection<KV<RequestT, ResponseT>> getResponses() {
- return responses;
- }
-
- PCollection<ApiIOError> getFailures() {
- return failures;
- }
-
- @Override
- public Pipeline getPipeline() {
- return this.pipeline;
- }
-
- @Override
- public Map<TupleTag<?>, PValue> expand() {
- return ImmutableMap.of(
- responseTag, responses,
- FAILURE_TAG, failures);
- }
-
- @Override
- public void finishSpecifyingOutput(
- String transformName, PInput input, PTransform<?, ?> transform) {}
- }
-}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheWrite.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheWrite.java
deleted file mode 100644
index 25249c3e41b..00000000000
---
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/CacheWrite.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import java.util.Map;
-import org.apache.beam.io.requestresponse.CacheWrite.Result;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-
-/**
- * {@link CacheWrite} writes associated {@link RequestT} and {@link ResponseT}
pairs to a cache.
- * Using {@link RequestT} and {@link ResponseT}'s {@link
org.apache.beam.sdk.coders.Coder}, this
- * transform writes encoded representations of this association.
- */
-class CacheWrite<RequestT, ResponseT>
- extends PTransform<PCollection<KV<RequestT, ResponseT>>, Result<RequestT,
ResponseT>> {
-
- private static final TupleTag<ApiIOError> FAILURE_TAG = new
TupleTag<ApiIOError>() {};
-
- // TODO(damondouglas): remove suppress warnings after configuration is used.
- @SuppressWarnings({"unused"})
- private final Configuration<RequestT, ResponseT> configuration;
-
- private CacheWrite(Configuration<RequestT, ResponseT> configuration) {
- this.configuration = configuration;
- }
-
- /** Configuration details for {@link CacheWrite}. */
- @AutoValue
- abstract static class Configuration<RequestT, ResponseT> {
-
- static <RequestT, ResponseT> Builder<RequestT, ResponseT> builder() {
- return new AutoValue_CacheWrite_Configuration.Builder<>();
- }
-
- abstract Builder<RequestT, ResponseT> toBuilder();
-
- @AutoValue.Builder
- abstract static class Builder<RequestT, ResponseT> {
-
- abstract Configuration<RequestT, ResponseT> build();
- }
- }
-
- @Override
- public Result<RequestT, ResponseT> expand(PCollection<KV<RequestT,
ResponseT>> input) {
- return Result.of(
- new TupleTag<KV<RequestT, ResponseT>>() {},
PCollectionTuple.empty(input.getPipeline()));
- }
-
- /** The {@link Result} of writing a request/response {@link KV} {@link
PCollection}. */
- static class Result<RequestT, ResponseT> implements POutput {
-
- static <RequestT, ResponseT> Result<RequestT, ResponseT> of(
- TupleTag<KV<RequestT, ResponseT>> responseTag, PCollectionTuple pct) {
- return new Result<>(responseTag, pct);
- }
-
- private final Pipeline pipeline;
- private final TupleTag<KV<RequestT, ResponseT>> responseTag;
- private final PCollection<KV<RequestT, ResponseT>> responses;
- private final PCollection<ApiIOError> failures;
-
- private Result(TupleTag<KV<RequestT, ResponseT>> responseTag,
PCollectionTuple pct) {
- this.pipeline = pct.getPipeline();
- this.responseTag = responseTag;
- this.responses = pct.get(responseTag);
- this.failures = pct.get(FAILURE_TAG);
- }
-
- public PCollection<KV<RequestT, ResponseT>> getResponses() {
- return responses;
- }
-
- public PCollection<ApiIOError> getFailures() {
- return failures;
- }
-
- @Override
- public Pipeline getPipeline() {
- return this.pipeline;
- }
-
- @Override
- public Map<TupleTag<?>, PValue> expand() {
- return ImmutableMap.of(
- responseTag, responses,
- FAILURE_TAG, failures);
- }
-
- @Override
- public void finishSpecifyingOutput(
- String transformName, PInput input, PTransform<?, ?> transform) {}
- }
-}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
index 52181af534e..d52ca971ca4 100644
---
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
@@ -100,7 +100,11 @@ class Call<RequestT, ResponseT>
.build());
}
- private static final TupleTag<ApiIOError> FAILURE_TAG = new
TupleTag<ApiIOError>() {};
+ // TupleTags need to be instantiated for each Call instance. We cannot use a
shared
+ // static instance that is shared for multiple PCollectionTuples when Call is
+ // instantiated multiple times as it is reused throughout code in this
library.
+ private final TupleTag<ResponseT> responseTag = new TupleTag<ResponseT>() {};
+ private final TupleTag<ApiIOError> failureTag = new TupleTag<ApiIOError>()
{};
private final Configuration<RequestT, ResponseT> configuration;
@@ -128,27 +132,30 @@ class Call<RequestT, ResponseT>
@Override
public @NonNull Result<ResponseT> expand(PCollection<RequestT> input) {
- TupleTag<ResponseT> responseTag = new TupleTag<ResponseT>() {};
PCollectionTuple pct =
input.apply(
CallFn.class.getSimpleName(),
- ParDo.of(new CallFn<>(responseTag, configuration))
- .withOutputTags(responseTag, TupleTagList.of(FAILURE_TAG)));
+ ParDo.of(new CallFn<>(responseTag, failureTag, configuration))
+ .withOutputTags(responseTag, TupleTagList.of(failureTag)));
- return Result.of(configuration.getResponseCoder(), responseTag, pct);
+ return Result.of(configuration.getResponseCoder(), responseTag,
failureTag, pct);
}
private static class CallFn<RequestT, ResponseT> extends DoFn<RequestT,
ResponseT> {
private final TupleTag<ResponseT> responseTag;
+ private final TupleTag<ApiIOError> failureTag;
private final CallerWithTimeout<RequestT, ResponseT> caller;
private final SetupTeardownWithTimeout setupTeardown;
private transient @MonotonicNonNull ExecutorService executor;
private CallFn(
- TupleTag<ResponseT> responseTag, Configuration<RequestT, ResponseT>
configuration) {
+ TupleTag<ResponseT> responseTag,
+ TupleTag<ApiIOError> failureTag,
+ Configuration<RequestT, ResponseT> configuration) {
this.responseTag = responseTag;
+ this.failureTag = failureTag;
this.caller = new CallerWithTimeout<>(configuration.getTimeout(),
configuration.getCaller());
this.setupTeardown =
new SetupTeardownWithTimeout(
@@ -194,7 +201,7 @@ class Call<RequestT, ResponseT>
ResponseT response = this.caller.call(request);
receiver.get(responseTag).output(response);
} catch (UserCodeExecutionException e) {
- receiver.get(FAILURE_TAG).output(ApiIOError.of(e, request));
+ receiver.get(failureTag).output(ApiIOError.of(e, request));
}
}
}
@@ -269,21 +276,29 @@ class Call<RequestT, ResponseT>
static class Result<ResponseT> implements POutput {
static <ResponseT> Result<ResponseT> of(
- Coder<ResponseT> responseTCoder, TupleTag<ResponseT> responseTag,
PCollectionTuple pct) {
- return new Result<>(responseTCoder, responseTag, pct);
+ Coder<ResponseT> responseTCoder,
+ TupleTag<ResponseT> responseTag,
+ TupleTag<ApiIOError> failureTag,
+ PCollectionTuple pct) {
+ return new Result<>(responseTCoder, responseTag, pct, failureTag);
}
private final Pipeline pipeline;
private final TupleTag<ResponseT> responseTag;
+ private final TupleTag<ApiIOError> failureTag;
private final PCollection<ResponseT> responses;
private final PCollection<ApiIOError> failures;
private Result(
- Coder<ResponseT> responseTCoder, TupleTag<ResponseT> responseTag,
PCollectionTuple pct) {
+ Coder<ResponseT> responseTCoder,
+ TupleTag<ResponseT> responseTag,
+ PCollectionTuple pct,
+ TupleTag<ApiIOError> failureTag) {
this.pipeline = pct.getPipeline();
this.responseTag = responseTag;
+ this.failureTag = failureTag;
this.responses = pct.get(responseTag).setCoder(responseTCoder);
- this.failures = pct.get(FAILURE_TAG);
+ this.failures = pct.get(this.failureTag);
}
public PCollection<ResponseT> getResponses() {
@@ -303,7 +318,7 @@ class Call<RequestT, ResponseT>
public @NonNull Map<TupleTag<?>, PValue> expand() {
return ImmutableMap.of(
responseTag, responses,
- FAILURE_TAG, failures);
+ failureTag, failures);
}
@Override
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java
new file mode 100644
index 00000000000..d2e538cf7cf
--- /dev/null
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Quota.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import java.io.Serializable;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * A data class that expresses a quota. Web API providers typically define a
quota as the number of
+ * requests per time interval.
+ */
+public class Quota implements Serializable {
+ private final long numRequests;
+ private final @NonNull Duration interval;
+
+ public Quota(long numRequests, @NonNull Duration interval) {
+ this.numRequests = numRequests;
+ this.interval = interval;
+ }
+
+ /** The number of allowed requests. */
+ public long getNumRequests() {
+ return numRequests;
+ }
+
+ /** The duration context within which to allow requests. */
+ public @NonNull Duration getInterval() {
+ return interval;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Quota quota = (Quota) o;
+ return numRequests == quota.numRequests && Objects.equal(interval,
quota.interval);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(numRequests, interval);
+ }
+
+ @Override
+ public String toString() {
+ return "Quota{" + "numRequests=" + numRequests + ", interval=" + interval
+ '}';
+ }
+}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java
index a87f5c191e4..a347a185241 100644
---
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/RedisClient.java
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.transforms.DoFn;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.exceptions.JedisException;
@@ -61,6 +62,15 @@ class RedisClient implements SetupTeardown {
}
}
+ /** Get a byte array associated with a byte array key. Returns null if key
does not exist. */
+ byte @Nullable [] getBytes(byte[] key) throws UserCodeExecutionException {
+ try {
+ return getSafeClient().get(key);
+ } catch (JedisException e) {
+ throw new UserCodeExecutionException(e);
+ }
+ }
+
/**
* Get the long value stored by the key. Yields zero when key does not
exist, keeping consistency
* with Redis convention. Consider using {@link #exists} to query key
existance.
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleDequeue.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleDequeue.java
deleted file mode 100644
index 085b13b5e11..00000000000
---
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleDequeue.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import java.util.Map;
-import org.apache.beam.io.requestresponse.ThrottleDequeue.Result;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.joda.time.Instant;
-
-/**
- * {@link ThrottleDequeue} dequeues {@link RequestT} elements at a fixed rate
yielding a {@link
- * Result} containing the dequeued {@link RequestT} {@link PCollection} and a
{@link ApiIOError}
- * {@link PCollection} of any errors.
- */
-class ThrottleDequeue<RequestT> extends PTransform<PCollection<Instant>,
Result<RequestT>> {
-
- private static final TupleTag<ApiIOError> FAILURE_TAG = new
TupleTag<ApiIOError>() {};
-
- // TODO(damondouglas): remove suppress warnings after instance utilized.
- @SuppressWarnings({"unused"})
- private final Configuration<RequestT> configuration;
-
- private ThrottleDequeue(Configuration<RequestT> configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public Result<RequestT> expand(PCollection<Instant> input) {
- // TODO(damondouglas): expand in a future PR.
- return new Result<>(new TupleTag<RequestT>() {},
PCollectionTuple.empty(input.getPipeline()));
- }
-
- @AutoValue
- abstract static class Configuration<RequestT> {
-
- @AutoValue.Builder
- abstract static class Builder<RequestT> {
- abstract Configuration<RequestT> build();
- }
- }
-
- /** The {@link Result} of dequeuing {@link RequestT}s. */
- static class Result<RequestT> implements POutput {
-
- static <RequestT> Result<RequestT> of(TupleTag<RequestT> requestsTag,
PCollectionTuple pct) {
- return new Result<>(requestsTag, pct);
- }
-
- private final Pipeline pipeline;
- private final TupleTag<RequestT> requestsTag;
- private final PCollection<RequestT> requests;
- private final PCollection<ApiIOError> failures;
-
- private Result(TupleTag<RequestT> requestsTag, PCollectionTuple pct) {
- this.pipeline = pct.getPipeline();
- this.requestsTag = requestsTag;
- this.requests = pct.get(requestsTag);
- this.failures = pct.get(FAILURE_TAG);
- }
-
- @Override
- public Pipeline getPipeline() {
- return pipeline;
- }
-
- @Override
- public Map<TupleTag<?>, PValue> expand() {
- return ImmutableMap.of(
- requestsTag, requests,
- FAILURE_TAG, failures);
- }
-
- @Override
- public void finishSpecifyingOutput(
- String transformName, PInput input, PTransform<?, ?> transform) {}
- }
-}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleEnqueue.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleEnqueue.java
deleted file mode 100644
index 505ef86be48..00000000000
---
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleEnqueue.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * {@link ThrottleEnqueue} enqueues {@link RequestT} elements yielding an
{@link ApiIOError} {@link
- * PCollection} of any enqueue errors.
- */
-class ThrottleEnqueue<RequestT> extends PTransform<PCollection<RequestT>,
PCollection<ApiIOError>> {
-
- @SuppressWarnings({"unused"})
- private final Configuration<RequestT> configuration;
-
- private ThrottleEnqueue(Configuration<RequestT> configuration) {
- this.configuration = configuration;
- }
-
- /** Configuration details for {@link ThrottleEnqueue}. */
- @AutoValue
- abstract static class Configuration<RequestT> {
-
- static <RequestT> Builder<RequestT> builder() {
- return new AutoValue_ThrottleEnqueue_Configuration.Builder<>();
- }
-
- abstract Builder<RequestT> toBuilder();
-
- @AutoValue.Builder
- abstract static class Builder<RequestT> {
-
- abstract Configuration<RequestT> build();
- }
- }
-
- @Override
- public PCollection<ApiIOError> expand(PCollection<RequestT> input) {
- // TODO(damondouglas): expand in a future PR.
- return
input.getPipeline().apply(Create.empty(TypeDescriptor.of(ApiIOError.class)));
- }
-}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleRefreshQuota.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleRefreshQuota.java
deleted file mode 100644
index 57e57528db4..00000000000
---
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleRefreshQuota.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.io.requestresponse;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.joda.time.Instant;
-
-/**
- * {@link ThrottleRefreshQuota} refreshes a quota per {@link Instant}
processing events emitting any
- * errors into an {@link ApiIOError} {@link PCollection}.
- */
-class ThrottleRefreshQuota extends PTransform<PCollection<Instant>,
PCollection<ApiIOError>> {
-
- // TODO: remove suppress warnings after configuration utilized.
- @SuppressWarnings({"unused"})
- private final Configuration configuration;
-
- private ThrottleRefreshQuota(Configuration configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public PCollection<ApiIOError> expand(PCollection<Instant> input) {
- // TODO(damondouglas): expand in a later PR.
- return
input.getPipeline().apply(Create.empty(TypeDescriptor.of(ApiIOError.class)));
- }
-
- @AutoValue
- abstract static class Configuration {
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Configuration build();
- }
- }
-}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java
new file mode 100644
index 00000000000..dffc034770a
--- /dev/null
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Throttles a {@link T} {@link PCollection} using an external resource.
+ *
+ * <p>{@link ThrottleWithExternalResource} makes use of {@link
PeriodicImpulse} as it needs to
+ * coordinate three {@link PTransform}s concurrently. Usage of {@link
ThrottleWithExternalResource}
+ * should consider the impact of {@link PeriodicImpulse} on the pipeline.
+ *
+ * <p>Usage of {@link ThrottleWithExternalResource} is completely optional and
serves as one of many
+ * methods by {@link RequestResponseIO} to protect against API overuse. Usage
should not depend on
+ * {@link ThrottleWithExternalResource} alone to achieve API overuse
prevention for several reasons.
+ * The underlying external resource may not scale at all or as fast as a Beam
Runner. The external
+ * resource itself may be an API with its own quota that {@link
ThrottleWithExternalResource} does
+ * not consider.
+ *
+ * <p>{@link ThrottleWithExternalResource} makes use of several {@link
Caller}s that work together
+ * to achieve its aim of throttling a {@link T} {@link PCollection}. A {@link
RefresherT} is a
+ * {@link Caller} that takes an {@link Instant} and refreshes a shared {@link
Quota}. An {@link
+ * EnqueuerT} enqueues a {@link T} element while a {@link DequeuerT} dequeues
said element when the
+ * {@link ReporterT} reports that the stored {@link Quota#getNumRequests} is
>0. Finally, a {@link
+ * DecrementerT} decrements from the shared {@link Quota} value, additionally
reporting the value
+ * after performing the action.
+ *
+ * <p>{@link ThrottleWithExternalResource} instantiates and applies two {@link
Call} {@link
+ * PTransform}s using the aforementioned {@link Caller}s {@link RefresherT}
and {@link EnqueuerT}.
+ * {@link ThrottleWithExternalResource} calls {@link ReporterT}, {@link
DequeuerT}, {@link
+ * DecrementerT} within its {@link DoFn}, emitting the dequeued {@link T} when
the {@link ReporterT}
+ * reports a value >0. As an additional safety check, the DoFn checks whether
the {@link Quota}
+ * value after {@link DecrementerT}'s action is <0, signaling that multiple
workers are attempting
+ * the same too fast and therefore exists the DoFn allowing for the next
refresh.
+ *
+ * <p>{@link ThrottleWithExternalResource} flattens errors emitted from {@link
EnqueuerT}, {@link
+ * RefresherT}, and its own {@link DoFn} into a single {@link ApiIOError}
{@link PCollection} that
+ * is encapsulated, with a {@link T} {@link PCollection} output into a {@link
Call.Result}.
+ */
+class ThrottleWithExternalResource<
+ T,
+ ReporterT extends Caller<String, Long> & SetupTeardown,
+ EnqueuerT extends Caller<T, Void> & SetupTeardown,
+ DequeuerT extends Caller<Instant, T> & SetupTeardown,
+ DecrementerT extends Caller<Instant, Long> & SetupTeardown,
+ RefresherT extends Caller<Instant, Void> & SetupTeardown>
+ extends PTransform<PCollection<T>, Call.Result<T>> {
+
+ /**
+ * Instantiate a {@link ThrottleWithExternalResource} using a {@link
RedisClient}.
+ *
+ * <p><a href="https://redis.io">Redis</a> is designed for multiple
workloads, simultaneously
+ * reading and writing to a shared instance. See <a
+ * href="https://redis.io/docs/get-started/faq/">Redis FAQ</a> for more
information on important
+ * considerations when using Redis as {@link ThrottleWithExternalResource}'s
external resource.
+ */
+ static <T>
+ ThrottleWithExternalResource<
+ T,
+ RedisReporter,
+ RedisEnqueuer<T>,
+ RedisDequeuer<T>,
+ RedisDecrementer,
+ RedisRefresher>
+ usingRedis(URI uri, String quotaIdentifier, String queueKey, Quota
quota, Coder<T> coder)
+ throws Coder.NonDeterministicException {
+ return new ThrottleWithExternalResource<
+ T, RedisReporter, RedisEnqueuer<T>, RedisDequeuer<T>,
RedisDecrementer, RedisRefresher>(
+ quota,
+ quotaIdentifier,
+ coder,
+ new RedisReporter(uri),
+ new RedisEnqueuer<>(uri, queueKey, coder),
+ new RedisDequeuer<>(uri, coder, queueKey),
+ new RedisDecrementer(uri, queueKey),
+ new RedisRefresher(uri, quota, quotaIdentifier));
+ }
+
+ private static final Duration THROTTLE_INTERVAL =
Duration.standardSeconds(1L);
+
+ private final Quota quota;
+ private final String quotaIdentifier;
+ private final Coder<T> coder;
+ private final ReporterT reporterT;
+ private final EnqueuerT enqueuerT;
+ private final DequeuerT dequeuerT;
+ private final DecrementerT decrementerT;
+ private final RefresherT refresherT;
+
+ ThrottleWithExternalResource(
+ Quota quota,
+ String quotaIdentifier,
+ Coder<T> coder,
+ ReporterT reporterT,
+ EnqueuerT enqueuerT,
+ DequeuerT dequeuerT,
+ DecrementerT decrementerT,
+ RefresherT refresherT)
+ throws Coder.NonDeterministicException {
+ this.quotaIdentifier = quotaIdentifier;
+ this.reporterT = reporterT;
+ coder.verifyDeterministic();
+ checkArgument(!quotaIdentifier.isEmpty());
+ this.quota = quota;
+ this.coder = coder;
+ this.enqueuerT = enqueuerT;
+ this.dequeuerT = dequeuerT;
+ this.decrementerT = decrementerT;
+ this.refresherT = refresherT;
+ }
+
+ @Override
+ public Call.Result<T> expand(PCollection<T> input) {
+ Pipeline pipeline = input.getPipeline();
+
+ // Refresh known quota to control the throttle rate.
+ Call.Result<Void> refreshResult =
+ pipeline
+ .apply("quota impulse",
PeriodicImpulse.create().withInterval(quota.getInterval()))
+ .apply("quota refresh", getRefresher());
+
+ // Enqueue T elements.
+ Call.Result<Void> enqueuResult = input.apply("enqueue", getEnqueuer());
+
+ TupleTag<T> outputTag = new TupleTag<T>() {};
+ TupleTag<ApiIOError> failureTag = new TupleTag<ApiIOError>() {};
+
+ // Perform Throttle.
+ PCollectionTuple pct =
+ pipeline
+ .apply("throttle impulse",
PeriodicImpulse.create().withInterval(THROTTLE_INTERVAL))
+ .apply(
+ "throttle fn",
+ ParDo.of(
+ new ThrottleFn(
+ quotaIdentifier,
+ dequeuerT,
+ decrementerT,
+ reporterT,
+ outputTag,
+ failureTag))
+ .withOutputTags(outputTag, TupleTagList.of(failureTag)));
+
+ PCollection<ApiIOError> errors =
+ PCollectionList.of(refreshResult.getFailures())
+ .and(enqueuResult.getFailures())
+ .and(pct.get(failureTag))
+ .apply("errors flatten", Flatten.pCollections());
+
+ TupleTag<T> resultOutputTag = new TupleTag<T>() {};
+ TupleTag<ApiIOError> resultFailureTag = new TupleTag<ApiIOError>() {};
+
+ return Call.Result.<T>of(
+ coder,
+ resultOutputTag,
+ resultFailureTag,
+ PCollectionTuple.of(resultOutputTag,
pct.get(outputTag)).and(resultFailureTag, errors));
+ }
+
+ private Call<Instant, Void> getRefresher() {
+ return Call.ofCallerAndSetupTeardown(refresherT, VoidCoder.of());
+ }
+
+ private Call<T, Void> getEnqueuer() {
+ return Call.ofCallerAndSetupTeardown(enqueuerT, VoidCoder.of());
+ }
+
+ private class ThrottleFn extends DoFn<Instant, T> {
+ private final String quotaIdentifier;
+ private final DequeuerT dequeuerT;
+ private final DecrementerT decrementerT;
+ private final ReporterT reporterT;
+ private final TupleTag<T> outputTag;
+ private final TupleTag<ApiIOError> failureTag;
+
+ private ThrottleFn(
+ String quotaIdentifier,
+ DequeuerT dequeuerT,
+ DecrementerT decrementerT,
+ ReporterT reporterT,
+ TupleTag<T> outputTag,
+ TupleTag<ApiIOError> failureTag) {
+ this.quotaIdentifier = quotaIdentifier;
+ this.dequeuerT = dequeuerT;
+ this.decrementerT = decrementerT;
+ this.reporterT = reporterT;
+ this.outputTag = outputTag;
+ this.failureTag = failureTag;
+ }
+
+ @ProcessElement
+ public void process(@Element Instant instant, MultiOutputReceiver
receiver) {
+ // Check for available quota.
+ try {
+ if (reporterT.call(quotaIdentifier) <= 0L) {
+ return;
+ }
+
+ // Decrement the quota.
+ Long quotaAfterDecrement = decrementerT.call(instant);
+
+ // As an additional protection we check what the quota is after
decrementing. A value
+ // < 0 signals that multiple simultaneous workers have attempted to
decrement too quickly.
+ // We don't bother adding the quota back to prevent additional workers
from doing the same
+ // and just wait for the next refresh, exiting the DoFn.
+ if (quotaAfterDecrement < 0) {
+ return;
+ }
+
+ // Dequeue an element if quota available. An error here would not
result in loss of data
+ // as no element would successfully dequeue from the external resource
but instead throw.
+ T element = dequeuerT.call(instant);
+
+ // Finally, emit the element.
+ receiver.get(outputTag).output(element);
+
+ } catch (UserCodeExecutionException e) {
+ receiver
+ .get(failureTag)
+ .output(
+ ApiIOError.builder()
+ // no request to emit as part of the error.
+ .setRequestAsJsonString("")
+ .setMessage(Optional.ofNullable(e.getMessage()).orElse(""))
+ .setObservedTimestamp(Instant.now())
+ .setStackTrace(Throwables.getStackTraceAsString(e))
+ .build());
+ }
+ }
+
+ @Setup
+ public void setup() throws UserCodeExecutionException {
+ enqueuerT.setup();
+ dequeuerT.setup();
+ decrementerT.setup();
+ reporterT.setup();
+ }
+
+ @Teardown
+ public void teardown() throws UserCodeExecutionException {
+ List<String> messages = new ArrayList<>();
+ String format = "%s encountered error during teardown: %s";
+ try {
+ enqueuerT.teardown();
+ } catch (UserCodeExecutionException e) {
+ messages.add(String.format(format, "enqueuerT", e));
+ }
+ try {
+ dequeuerT.teardown();
+ } catch (UserCodeExecutionException e) {
+ messages.add(String.format(format, "dequeuerT", e));
+ }
+ try {
+ decrementerT.teardown();
+ } catch (UserCodeExecutionException e) {
+ messages.add(String.format(format, "decrementerT", e));
+ }
+ try {
+ reporterT.teardown();
+ } catch (UserCodeExecutionException e) {
+ messages.add(String.format(format, "reporterT", e));
+ }
+
+ if (!messages.isEmpty()) {
+ throw new UserCodeExecutionException(String.join("; ", messages));
+ }
+ }
+ }
+
+ private static class RedisReporter extends RedisSetupTeardown implements
Caller<String, Long> {
+ private RedisReporter(URI uri) {
+ super(new RedisClient(uri));
+ }
+
+ @Override
+ public Long call(String request) throws UserCodeExecutionException {
+ return client.getLong(request);
+ }
+ }
+
+ private static class RedisEnqueuer<T> extends RedisSetupTeardown implements
Caller<T, Void> {
+ private final String key;
+ private final Coder<T> coder;
+
+ private RedisEnqueuer(URI uri, String key, Coder<T> coder) {
+ super(new RedisClient(uri));
+ this.key = key;
+ this.coder = coder;
+ }
+
+ @Override
+ public Void call(T request) throws UserCodeExecutionException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ coder.encode(request, baos);
+ } catch (IOException e) {
+ throw new UserCodeExecutionException(e);
+ }
+ client.rpush(key, baos.toByteArray());
+ return null;
+ }
+ }
+
+ private static class RedisDequeuer<T> extends RedisSetupTeardown implements
Caller<Instant, T> {
+
+ private final Coder<T> coder;
+ private final String key;
+
+ private RedisDequeuer(URI uri, Coder<T> coder, String key) {
+ super(new RedisClient(uri));
+ this.coder = coder;
+ this.key = key;
+ }
+
+ @Override
+ public T call(Instant request) throws UserCodeExecutionException {
+ byte[] bytes = client.lpop(key);
+ try {
+ return
checkStateNotNull(coder.decode(ByteSource.wrap(bytes).openStream()));
+
+ } catch (IOException e) {
+ throw new UserCodeExecutionException(e);
+ }
+ }
+ }
+
+ private static class RedisDecrementer extends RedisSetupTeardown
+ implements Caller<Instant, Long> {
+
+ private final String key;
+
+ private RedisDecrementer(URI uri, String key) {
+ super(new RedisClient(uri));
+ this.key = key;
+ }
+
+ @Override
+ public Long call(Instant request) throws UserCodeExecutionException {
+ return client.decr(key);
+ }
+ }
+
+ private static class RedisRefresher extends RedisSetupTeardown implements
Caller<Instant, Void> {
+ private final Quota quota;
+ private final String key;
+
+ private RedisRefresher(URI uri, Quota quota, String key) {
+ super(new RedisClient(uri));
+ this.quota = quota;
+ this.key = key;
+ }
+
+ @Override
+ public Void call(Instant request) throws UserCodeExecutionException {
+ client.setex(key, quota.getNumRequests(), quota.getInterval());
+ return null;
+ }
+ }
+
+ private abstract static class RedisSetupTeardown implements SetupTeardown {
+ protected final RedisClient client;
+
+ private RedisSetupTeardown(RedisClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public void setup() throws UserCodeExecutionException {
+ client.setup();
+ }
+
+ @Override
+ public void teardown() throws UserCodeExecutionException {
+ client.teardown();
+ }
+ }
+}
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java
new file mode 100644
index 00000000000..95497e6013a
--- /dev/null
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import java.net.URI;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.io.requestresponse.CallTest.Request;
+import org.apache.beam.io.requestresponse.CallTest.Response;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/** Integration tests for {@link Cache}. */
+@RunWith(JUnit4.class)
+public class CacheIT {
+ @Rule public TestPipeline writePipeline = TestPipeline.create();
+
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+ private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine";
+ private static final Integer PORT = 6379;
+
+ @Rule
+ public GenericContainer<?> redis =
+ new
GenericContainer<>(DockerImageName.parse(CONTAINER_IMAGE_NAME)).withExposedPorts(PORT);
+
+ @Rule
+ public RedisExternalResourcesRule externalClients =
+ new RedisExternalResourcesRule(
+ () -> {
+ redis.start();
+ return URI.create(
+ String.format("redis://%s:%d", redis.getHost(),
redis.getFirstMappedPort()));
+ });
+
+ @Test
+ public void givenRequestResponsesCached_writeThenReadYieldsMatches()
+ throws NonDeterministicException {
+ List<KV<Request, Response>> toWrite =
+ ImmutableList.of(
+ KV.of(new Request("a"), new Response("a")),
+ KV.of(new Request("b"), new Response("b")),
+ KV.of(new Request("c"), new Response("c")));
+ List<Request> toRead = ImmutableList.of(new Request("a"), new
Request("b"), new Request("c"));
+ writeThenReadThenPAssert(toWrite, toRead, toWrite);
+ }
+
+ @Test
+ public void givenNoMatchingRequestResponsePairs_yieldsKVsWithNullValues()
+ throws NonDeterministicException {
+ List<KV<Request, Response>> toWrite =
+ ImmutableList.of(
+ KV.of(new Request("a"), new Response("a")),
+ KV.of(new Request("b"), new Response("b")),
+ KV.of(new Request("c"), new Response("c")));
+ List<Request> toRead = ImmutableList.of(new Request("d"), new
Request("e"), new Request("f"));
+ List<KV<Request, Response>> expected =
+ toRead.stream()
+ .<KV<Request, Response>>map(request -> KV.of(request, null))
+ .collect(Collectors.toList());
+ writeThenReadThenPAssert(toWrite, toRead, expected);
+ }
+
+ private void writeThenReadThenPAssert(
+ List<KV<Request, Response>> toWrite,
+ List<Request> toRead,
+ List<KV<Request, Response>> expected)
+ throws NonDeterministicException {
+ PCollection<KV<Request, Response>> toWritePCol =
writePipeline.apply(Create.of(toWrite));
+ toWritePCol.apply(
+ Cache.writeUsingRedis(
+ Duration.standardHours(1L),
+ externalClients.getActualClient(),
+ CallTest.DETERMINISTIC_REQUEST_CODER,
+ CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+ PCollection<Request> requests =
+
readPipeline.apply(Create.of(toRead)).setCoder(CallTest.DETERMINISTIC_REQUEST_CODER);
+
+ Call.Result<KV<Request, Response>> gotKVsResult =
+ requests.apply(
+ Cache.readUsingRedis(
+ externalClients.getActualClient(),
+ CallTest.DETERMINISTIC_REQUEST_CODER,
+ CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+ PAssert.that(gotKVsResult.getFailures()).empty();
+ PAssert.that(gotKVsResult.getResponses()).containsInAnyOrder(expected);
+
+ writePipeline.run().waitUntilFinish();
+ readPipeline.run();
+ }
+}
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheTest.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheTest.java
new file mode 100644
index 00000000000..fcb7862e991
--- /dev/null
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CacheTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertThrows;
+
+import java.net.URI;
+import org.apache.beam.io.requestresponse.CallTest.Request;
+import org.apache.beam.io.requestresponse.CallTest.Response;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Cache}. */
+@RunWith(JUnit4.class)
+public class CacheTest {
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void givenNonDeterministicCoder_readUsingRedis_throwsError()
+ throws Coder.NonDeterministicException {
+ URI uri = URI.create("redis://localhost:6379");
+ assertThrows(
+ NonDeterministicException.class,
+ () ->
+ Cache.readUsingRedis(
+ new RedisClient(uri),
+ CallTest.NON_DETERMINISTIC_REQUEST_CODER,
+ CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+ assertThrows(
+ NonDeterministicException.class,
+ () ->
+ Cache.readUsingRedis(
+ new RedisClient(uri),
+ CallTest.DETERMINISTIC_REQUEST_CODER,
+ CallTest.NON_DETERMINISTIC_RESPONSE_CODER));
+
+ Cache.readUsingRedis(
+ new RedisClient(uri),
+ CallTest.DETERMINISTIC_REQUEST_CODER,
+ CallTest.DETERMINISTIC_RESPONSE_CODER);
+ }
+
+ @Test
+ public void givenNonDeterministicCoder_writeUsingRedis_throwsError()
+ throws Coder.NonDeterministicException {
+ URI uri = URI.create("redis://localhost:6379");
+ Duration expiry = Duration.standardSeconds(1L);
+ assertThrows(
+ NonDeterministicException.class,
+ () ->
+ Cache.writeUsingRedis(
+ expiry,
+ new RedisClient(uri),
+ CallTest.NON_DETERMINISTIC_REQUEST_CODER,
+ CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+ assertThrows(
+ NonDeterministicException.class,
+ () ->
+ Cache.writeUsingRedis(
+ expiry,
+ new RedisClient(uri),
+ CallTest.DETERMINISTIC_REQUEST_CODER,
+ CallTest.NON_DETERMINISTIC_RESPONSE_CODER));
+
+ Cache.writeUsingRedis(
+ expiry,
+ new RedisClient(uri),
+ CallTest.DETERMINISTIC_REQUEST_CODER,
+ CallTest.DETERMINISTIC_RESPONSE_CODER);
+ }
+
+ @Test
+ public void givenWrongRedisURI_throwsError() throws
NonDeterministicException {
+ URI uri = URI.create("redis://1.2.3.4:6379");
+ Duration expiry = Duration.standardSeconds(1L);
+ PCollection<Request> requests =
+ pipeline
+ .apply("create requests", Create.of(new Request("")))
+ .setCoder(CallTest.DETERMINISTIC_REQUEST_CODER);
+ requests.apply(
+ "readUsingRedis",
+ Cache.readUsingRedis(
+ new RedisClient(uri),
+ CallTest.DETERMINISTIC_REQUEST_CODER,
+ CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+ PCollection<KV<Request, Response>> kvs =
+ pipeline.apply("create kvs", Create.of(KV.of(new Request(""), new
Response(""))));
+ kvs.apply(
+ "writeUsingRedis",
+ Cache.writeUsingRedis(
+ expiry,
+ new RedisClient(uri),
+ CallTest.DETERMINISTIC_REQUEST_CODER,
+ CallTest.DETERMINISTIC_RESPONSE_CODER));
+
+ UncheckedExecutionException error =
+ assertThrows(UncheckedExecutionException.class, pipeline::run);
+ assertThat(
+ error.getCause().getMessage(),
+ containsString("Failed to connect to host: redis://1.2.3.4:6379"));
+ }
+}
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java
index 18574b00978..1566d172529 100644
---
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/CallTest.java
@@ -22,9 +22,17 @@ import static
org.apache.beam.sdk.values.TypeDescriptors.strings;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import org.apache.beam.io.requestresponse.Call.Result;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -36,6 +44,8 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.jetbrains.annotations.NotNull;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
@@ -47,13 +57,23 @@ import org.junit.runners.JUnit4;
public class CallTest {
@Rule public TestPipeline pipeline = TestPipeline.create();
- private static final SerializableCoder<@NonNull Response> RESPONSE_CODER =
+ static final SerializableCoder<@NonNull Request>
NON_DETERMINISTIC_REQUEST_CODER =
+ SerializableCoder.of(Request.class);
+
+ static final Coder<@NonNull Request> DETERMINISTIC_REQUEST_CODER =
+ new DeterministicRequestCoder();
+
+ static final SerializableCoder<@NonNull Response>
NON_DETERMINISTIC_RESPONSE_CODER =
SerializableCoder.of(Response.class);
+ static final Coder<@NonNull Response> DETERMINISTIC_RESPONSE_CODER =
+ new DeterministicResponseCoder();
+
@Test
public void givenCallerNotSerializable_throwsError() {
assertThrows(
- IllegalArgumentException.class, () -> Call.of(new
UnSerializableCaller(), RESPONSE_CODER));
+ IllegalArgumentException.class,
+ () -> Call.of(new UnSerializableCaller(),
NON_DETERMINISTIC_RESPONSE_CODER));
}
@Test
@@ -62,7 +82,7 @@ public class CallTest {
IllegalArgumentException.class,
() ->
Call.ofCallerAndSetupTeardown(
- new UnSerializableCallerWithSetupTeardown(), RESPONSE_CODER));
+ new UnSerializableCallerWithSetupTeardown(),
NON_DETERMINISTIC_RESPONSE_CODER));
}
@Test
@@ -70,7 +90,10 @@ public class CallTest {
Result<Response> result =
pipeline
.apply(Create.of(new Request("a")))
- .apply(Call.of(new CallerThrowsUserCodeExecutionException(),
RESPONSE_CODER));
+ .apply(
+ Call.of(
+ new CallerThrowsUserCodeExecutionException(),
+ NON_DETERMINISTIC_RESPONSE_CODER));
PCollection<ApiIOError> failures = result.getFailures();
PAssert.thatSingleton(countStackTracesOf(failures,
UserCodeExecutionException.class))
@@ -87,7 +110,7 @@ public class CallTest {
Result<Response> result =
pipeline
.apply(Create.of(new Request("a")))
- .apply(Call.of(new CallerInvokesQuotaException(), RESPONSE_CODER));
+ .apply(Call.of(new CallerInvokesQuotaException(),
NON_DETERMINISTIC_RESPONSE_CODER));
PCollection<ApiIOError> failures = result.getFailures();
PAssert.thatSingleton(countStackTracesOf(failures,
UserCodeExecutionException.class))
@@ -105,7 +128,9 @@ public class CallTest {
Result<Response> result =
pipeline
.apply(Create.of(new Request("a")))
- .apply(Call.of(new CallerExceedsTimeout(timeout),
RESPONSE_CODER).withTimeout(timeout));
+ .apply(
+ Call.of(new CallerExceedsTimeout(timeout),
NON_DETERMINISTIC_RESPONSE_CODER)
+ .withTimeout(timeout));
PCollection<ApiIOError> failures = result.getFailures();
PAssert.thatSingleton(countStackTracesOf(failures,
UserCodeExecutionException.class))
@@ -122,7 +147,7 @@ public class CallTest {
Result<Response> result =
pipeline
.apply(Create.of(new Request("a")))
- .apply(Call.of(new CallerThrowsTimeout(), RESPONSE_CODER));
+ .apply(Call.of(new CallerThrowsTimeout(),
NON_DETERMINISTIC_RESPONSE_CODER));
PCollection<ApiIOError> failures = result.getFailures();
PAssert.thatSingleton(countStackTracesOf(failures,
UserCodeExecutionException.class))
@@ -139,7 +164,7 @@ public class CallTest {
pipeline
.apply(Create.of(new Request("")))
.apply(
- Call.of(new ValidCaller(), RESPONSE_CODER)
+ Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
.withSetupTeardown(new
SetupThrowsUserCodeExecutionException()));
assertPipelineThrows(UserCodeExecutionException.class, pipeline);
@@ -150,7 +175,7 @@ public class CallTest {
pipeline
.apply(Create.of(new Request("")))
.apply(
- Call.of(new ValidCaller(), RESPONSE_CODER)
+ Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
.withSetupTeardown(new SetupThrowsUserCodeQuotaException()));
assertPipelineThrows(UserCodeQuotaException.class, pipeline);
@@ -163,7 +188,7 @@ public class CallTest {
pipeline
.apply(Create.of(new Request("")))
.apply(
- Call.of(new ValidCaller(), RESPONSE_CODER)
+ Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
.withSetupTeardown(new SetupExceedsTimeout(timeout))
.withTimeout(timeout));
@@ -175,7 +200,7 @@ public class CallTest {
pipeline
.apply(Create.of(new Request("")))
.apply(
- Call.of(new ValidCaller(), RESPONSE_CODER)
+ Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
.withSetupTeardown(new SetupThrowsUserCodeTimeoutException()));
assertPipelineThrows(UserCodeTimeoutException.class, pipeline);
@@ -186,7 +211,7 @@ public class CallTest {
pipeline
.apply(Create.of(new Request("")))
.apply(
- Call.of(new ValidCaller(), RESPONSE_CODER)
+ Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
.withSetupTeardown(new
TeardownThrowsUserCodeExecutionException()));
// Exceptions thrown during teardown do not populate with the cause
@@ -198,7 +223,7 @@ public class CallTest {
pipeline
.apply(Create.of(new Request("")))
.apply(
- Call.of(new ValidCaller(), RESPONSE_CODER)
+ Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
.withSetupTeardown(new
TeardownThrowsUserCodeQuotaException()));
// Exceptions thrown during teardown do not populate with the cause
@@ -211,7 +236,7 @@ public class CallTest {
pipeline
.apply(Create.of(new Request("")))
.apply(
- Call.of(new ValidCaller(), RESPONSE_CODER)
+ Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
.withTimeout(timeout)
.withSetupTeardown(new TeardownExceedsTimeout(timeout)));
@@ -224,7 +249,7 @@ public class CallTest {
pipeline
.apply(Create.of(new Request("")))
.apply(
- Call.of(new ValidCaller(), RESPONSE_CODER)
+ Call.of(new ValidCaller(), NON_DETERMINISTIC_RESPONSE_CODER)
.withSetupTeardown(new
TeardownThrowsUserCodeTimeoutException()));
// Exceptions thrown during teardown do not populate with the cause
@@ -236,7 +261,7 @@ public class CallTest {
Result<Response> result =
pipeline
.apply(Create.of(new Request("a")))
- .apply(Call.of(new ValidCaller(), RESPONSE_CODER));
+ .apply(Call.of(new ValidCaller(),
NON_DETERMINISTIC_RESPONSE_CODER));
PAssert.thatSingleton(result.getFailures().apply(Count.globally())).isEqualTo(0L);
PAssert.that(result.getResponses()).containsInAnyOrder(new Response("a"));
@@ -275,7 +300,7 @@ public class CallTest {
private static class UnSerializable {}
- private static class Request implements Serializable {
+ static class Request implements Serializable {
final String id;
@@ -305,7 +330,7 @@ public class CallTest {
}
}
- private static class Response implements Serializable {
+ static class Response implements Serializable {
final String id;
Response(String id) {
@@ -490,4 +515,55 @@ public class CallTest {
} catch (InterruptedException ignored) {
}
}
+
+ private static class DeterministicRequestCoder extends CustomCoder<@NonNull
Request> {
+ private static final Coder<String> ID_CODER = StringUtf8Coder.of();
+
+ @Override
+ public void encode(Request value, @NotNull OutputStream outStream)
+ throws CoderException, IOException {
+ ID_CODER.encode(checkStateNotNull(value).id, outStream);
+ }
+
+ @Override
+ public @NonNull Request decode(@NotNull InputStream inStream)
+ throws CoderException, IOException {
+ String id = ID_CODER.decode(inStream);
+ return new Request(id);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ ID_CODER.verifyDeterministic();
+ }
+ }
+
+ private static class DeterministicResponseCoder extends
CustomCoder<Response> {
+ private static final NullableCoder<String> ID_CODER =
NullableCoder.of(StringUtf8Coder.of());
+
+ @Override
+ public void encode(@Nullable Response value, @NotNull OutputStream
outStream)
+ throws CoderException, IOException {
+ if (value == null) {
+ ID_CODER.encode(null, outStream);
+ return;
+ }
+ ID_CODER.encode(checkStateNotNull(value).id, outStream);
+ }
+
+ @Override
+ public Response decode(@NotNull InputStream inStream) throws
CoderException, IOException {
+ try {
+ String id = ID_CODER.decode(inStream);
+ return new Response(id);
+ } catch (CoderException ignored) {
+ return null;
+ }
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ ID_CODER.verifyDeterministic();
+ }
+ }
}
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownTestIT.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java
similarity index 91%
rename from
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownTestIT.java
rename to
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java
index 14b6e9e6433..c10b7ee1609 100644
---
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownTestIT.java
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoGRPCCallerWithSetupTeardownIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.io.requestresponse;
+import static
org.apache.beam.io.requestresponse.EchoITOptions.GRPC_ENDPOINT_ADDRESS_NAME;
import static
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.junit.Assert.assertEquals;
@@ -27,6 +28,7 @@ import java.net.URI;
import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse;
import org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.AfterClass;
@@ -41,7 +43,7 @@ import org.junit.runners.JUnit4;
* running integration tests.
*/
@RunWith(JUnit4.class)
-public class EchoGRPCCallerWithSetupTeardownTestIT {
+public class EchoGRPCCallerWithSetupTeardownIT {
private static @MonotonicNonNull EchoITOptions options;
private static @MonotonicNonNull EchoGRPCCallerWithSetupTeardown client;
@@ -50,11 +52,15 @@ public class EchoGRPCCallerWithSetupTeardownTestIT {
@BeforeClass
public static void setUp() throws UserCodeExecutionException {
options = readIOTestPipelineOptions(EchoITOptions.class);
- if (options.getgRPCEndpointAddress().isEmpty()) {
+ if (Strings.isNullOrEmpty(options.getGrpcEndpointAddress())) {
throw new RuntimeException(
- "--gRPCEndpointAddress is missing. See " + EchoITOptions.class +
"for details.");
+ "--"
+ + GRPC_ENDPOINT_ADDRESS_NAME
+ + " is missing. See "
+ + EchoITOptions.class
+ + "for details.");
}
- client =
EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getgRPCEndpointAddress()));
+ client =
EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress()));
checkStateNotNull(client).setup();
EchoRequest request = createShouldExceedQuotaRequest();
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerTestIT.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerIT.java
similarity index 87%
rename from
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerTestIT.java
rename to
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerIT.java
index fa0cb937811..10b92b2610d 100644
---
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerTestIT.java
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoHTTPCallerIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.io.requestresponse;
+import static
org.apache.beam.io.requestresponse.EchoITOptions.HTTP_ENDPOINT_ADDRESS_NAME;
import static
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.junit.Assert.assertEquals;
@@ -28,6 +29,7 @@ import java.net.URI;
import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse;
import org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.BeforeClass;
@@ -36,12 +38,12 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
- * Tests for {@link EchoHTTPCallerTestIT} on a deployed {@link
EchoServiceGrpc} instance's HTTP
- * handler. See {@link EchoITOptions} for details on the required parameters
and how to provide
- * these for running integration tests.
+ * Tests for {@link EchoHTTPCallerIT} on a deployed {@link EchoServiceGrpc}
instance's HTTP handler.
+ * See {@link EchoITOptions} for details on the required parameters and how to
provide these for
+ * running integration tests.
*/
@RunWith(JUnit4.class)
-public class EchoHTTPCallerTestIT {
+public class EchoHTTPCallerIT {
private static @MonotonicNonNull EchoITOptions options;
private static @MonotonicNonNull EchoHTTPCaller client;
@@ -50,9 +52,13 @@ public class EchoHTTPCallerTestIT {
@BeforeClass
public static void setUp() throws UserCodeExecutionException {
options = readIOTestPipelineOptions(EchoITOptions.class);
- if (options.getHttpEndpointAddress().isEmpty()) {
+ if (Strings.isNullOrEmpty(options.getHttpEndpointAddress())) {
throw new RuntimeException(
- "--httpEndpointAddress is missing. See " + EchoITOptions.class +
"for details.");
+ "--"
+ + HTTP_ENDPOINT_ADDRESS_NAME
+ + " is missing. See "
+ + EchoITOptions.class
+ + "for details.");
}
client = EchoHTTPCaller.of(URI.create(options.getHttpEndpointAddress()));
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java
index a32f7a78e82..dabec750892 100644
---
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoITOptions.java
@@ -37,10 +37,13 @@ import
org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc;
* </pre>
*/
public interface EchoITOptions extends PipelineOptions {
+ String GRPC_ENDPOINT_ADDRESS_NAME = "grpcEndpointAddress";
+ String HTTP_ENDPOINT_ADDRESS_NAME = "httpEndpointAddress";
+
@Description("The gRPC address of the Echo API endpoint, typically of the
form <host>:<port>.")
- String getgRPCEndpointAddress();
+ String getGrpcEndpointAddress();
- void setgRPCEndpointAddress(String value);
+ void setGrpcEndpointAddress(String value);
@Description("The HTTP address of the Echo API endpoint; must being with
http(s)://")
String getHttpEndpointAddress();
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java
new file mode 100644
index 00000000000..75cd49904cf
--- /dev/null
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/EchoRequestCoder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+class EchoRequestCoder extends CustomCoder<@NonNull EchoRequest> {
+
+ @Override
+ public void encode(@NonNull EchoRequest value, @NonNull OutputStream
outStream)
+ throws CoderException, IOException {
+ value.writeTo(outStream);
+ }
+
+ @Override
+ public @NonNull EchoRequest decode(@NonNull InputStream inStream)
+ throws CoderException, IOException {
+ return EchoRequest.parseFrom(inStream);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {}
+}
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientTestIT.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java
similarity index 89%
rename from
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientTestIT.java
rename to
sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java
index 1fbb320a5f2..939515836bf 100644
---
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientTestIT.java
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RedisClientIT.java
@@ -20,6 +20,8 @@ package org.apache.beam.io.requestresponse;
import static
org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -48,7 +50,7 @@ import org.testcontainers.utility.DockerImageName;
/** Integration tests for {@link RedisClient}. */
@RunWith(JUnit4.class)
-public class RedisClientTestIT {
+public class RedisClientIT {
private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine";
private static final Integer PORT = 6379;
@@ -206,4 +208,24 @@ public class RedisClientTestIT {
public void givenKeyNotExists_getLong_yieldsZero() throws
UserCodeExecutionException {
assertEquals(0L,
externalClients.getActualClient().getLong(UUID.randomUUID().toString()));
}
+
+ @Test
+ public void givenKeyNotExists_getBytes_yieldsNull() throws
UserCodeExecutionException {
+ assertNull(
+ externalClients
+ .getActualClient()
+
.getBytes(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)));
+ }
+
+ @Test
+ public void givenKeyExists_getBytes_yieldsValue() throws
UserCodeExecutionException {
+ byte[] keyBytes =
UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
+ String expected = UUID.randomUUID().toString();
+ byte[] expectedBytes = expected.getBytes(StandardCharsets.UTF_8);
+ externalClients.getValidatingClient().set(keyBytes, expectedBytes);
+ byte[] actualBytes = externalClients.getActualClient().getBytes(keyBytes);
+ assertNotNull(actualBytes);
+ String actual = new String(actualBytes, StandardCharsets.UTF_8);
+ assertEquals(expected, actual);
+ }
}
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java
new file mode 100644
index 00000000000..24db38f926e
--- /dev/null
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceIT.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static
org.apache.beam.io.requestresponse.EchoITOptions.GRPC_ENDPOINT_ADDRESS_NAME;
+import static
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+
+import com.google.protobuf.ByteString;
+import java.net.URI;
+import java.util.UUID;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
+import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Integration tests for {@link ThrottleWithExternalResource}. See {@link
EchoITOptions} for details
+ * on the required parameters and how to provide these for running integration
tests.
+ */
+public class ThrottleWithExternalResourceIT {
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ private static final String QUOTA_ID =
"echo-ThrottleWithExternalResourceTestIT-10-per-1s-quota";
+ private static final Quota QUOTA = new Quota(1L,
Duration.standardSeconds(1L));
+ private static final ByteString PAYLOAD = ByteString.copyFromUtf8("payload");
+ private static @MonotonicNonNull EchoITOptions options;
+ private static @MonotonicNonNull EchoGRPCCallerWithSetupTeardown client;
+ private static final String CONTAINER_IMAGE_NAME = "redis:5.0.3-alpine";
+ private static final Integer PORT = 6379;
+ private static final EchoRequestCoder REQUEST_CODER = new EchoRequestCoder();
+ private static final Coder<EchoResponse> RESPONSE_CODER =
+ SerializableCoder.of(TypeDescriptor.of(EchoResponse.class));
+
+ @Rule
+ public GenericContainer<?> redis =
+ new
GenericContainer<>(DockerImageName.parse(CONTAINER_IMAGE_NAME)).withExposedPorts(PORT);
+
+ @BeforeClass
+ public static void setUp() throws UserCodeExecutionException {
+ options = readIOTestPipelineOptions(EchoITOptions.class);
+ if (Strings.isNullOrEmpty(options.getGrpcEndpointAddress())) {
+ throw new RuntimeException(
+ "--"
+ + GRPC_ENDPOINT_ADDRESS_NAME
+ + " is missing. See "
+ + EchoITOptions.class
+ + "for details.");
+ }
+ client =
EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getGrpcEndpointAddress()));
+ checkStateNotNull(client).setup();
+
+ try {
+ client.call(createRequest());
+ } catch (UserCodeExecutionException e) {
+ if (e instanceof UserCodeQuotaException) {
+ throw new RuntimeException(
+ String.format(
+ "The quota: %s is set to refresh on an interval. Unless there
are failures in this test, wait for a few seconds before running the test
again.",
+ QUOTA_ID),
+ e);
+ }
+ throw e;
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws UserCodeExecutionException {
+ checkStateNotNull(client).teardown();
+ }
+
+ @Test
+ public void givenThrottleUsingRedis_preventsQuotaErrors() throws
NonDeterministicException {
+ URI uri =
+ URI.create(String.format("redis://%s:%d", redis.getHost(),
redis.getFirstMappedPort()));
+ pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false);
+
+ Call.Result<EchoRequest> throttleResult =
+ createRequestStream()
+ .apply(
+ "throttle",
+ ThrottleWithExternalResource.usingRedis(
+ uri, QUOTA_ID, UUID.randomUUID().toString(), QUOTA,
REQUEST_CODER));
+
+ // Assert that we are not getting any errors and successfully emitting
throttled elements.
+ PAssert.that(throttleResult.getFailures()).empty();
+ PAssert.thatSingleton(
+ throttleResult
+ .getResponses()
+ .apply(
+ "window throttled",
Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
+ .apply(
+ "count throttled",
+
Combine.globally(Count.<EchoRequest>combineFn()).withoutDefaults()))
+ .notEqualTo(0L);
+
+ // Assert that all the throttled data is not corrupted.
+ PAssert.that(
+ throttleResult
+ .getResponses()
+ .apply(
+ "window throttled before extraction",
+ Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
+ .apply(
+ "extract request id",
+ MapElements.into(strings()).via(input ->
checkStateNotNull(input).getId()))
+ .apply("distinct", Distinct.create()))
+ .containsInAnyOrder(QUOTA_ID);
+
+ // Call the Echo service with throttled requests.
+ Call.Result<EchoResponse> echoResult =
+ throttleResult
+ .getResponses()
+ .apply("call", Call.ofCallerAndSetupTeardown(client,
RESPONSE_CODER));
+
+ // Assert that there were no errors.
+ PAssert.that(echoResult.getFailures()).empty();
+
+ // Assert that the responses match the requests.
+ PAssert.that(
+ echoResult
+ .getResponses()
+ .apply(
+ "window responses before extraction",
+ Window.into(FixedWindows.of(Duration.standardSeconds(1L))))
+ .apply(
+ "extract response id",
+ MapElements.into(strings()).via(input ->
checkStateNotNull(input).getId())))
+ .containsInAnyOrder(QUOTA_ID);
+
+ PipelineResult job = pipeline.run();
+ job.waitUntilFinish(Duration.standardSeconds(3L));
+ }
+
+ private static EchoRequest createRequest() {
+ return
EchoRequest.newBuilder().setId(QUOTA_ID).setPayload(PAYLOAD).build();
+ }
+
+ private PCollection<EchoRequest> createRequestStream() {
+ return pipeline
+ .apply("impulse",
PeriodicImpulse.create().withInterval(Duration.millis(10L)))
+ .apply(
+ "requests",
+ MapElements.into(TypeDescriptor.of(EchoRequest.class)).via(ignored
-> createRequest()));
+ }
+}
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java
new file mode 100644
index 00000000000..591ba923201
--- /dev/null
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResourceTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertThrows;
+
+import java.net.URI;
+import org.apache.beam.io.requestresponse.CallTest.Request;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ThrottleWithExternalResource}. */
+@RunWith(JUnit4.class)
+public class ThrottleWithExternalResourceTest {
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void givenNonDeterministicCoder_usingRedis_throwsError() throws
NonDeterministicException {
+ URI uri = URI.create("redis://localhost:6379");
+ String quotaIdentifier = "quota";
+ String queueKey = "queue";
+ Quota quota = new Quota(10L, Duration.standardSeconds(1L));
+
+ assertThrows(
+ NonDeterministicException.class,
+ () ->
+ ThrottleWithExternalResource.usingRedis(
+ uri, quotaIdentifier, queueKey, quota,
CallTest.NON_DETERMINISTIC_REQUEST_CODER));
+
+ ThrottleWithExternalResource.usingRedis(
+ uri, quotaIdentifier, queueKey, quota,
CallTest.DETERMINISTIC_REQUEST_CODER);
+ }
+
+ @Test
+ public void givenWrongRedisURI_throwsError() throws
NonDeterministicException {
+ URI uri = URI.create("redis://1.2.3.4:6379");
+ String quotaIdentifier = "quota";
+ String queueKey = "queue";
+ Quota quota = new Quota(10L, Duration.standardSeconds(1L));
+ PCollection<Request> requests =
+ pipeline.apply(Create.of(new
Request(""))).setCoder(CallTest.DETERMINISTIC_REQUEST_CODER);
+ requests.apply(
+ ThrottleWithExternalResource.usingRedis(
+ uri, quotaIdentifier, queueKey, quota, requests.getCoder()));
+
+ UncheckedExecutionException error =
+ assertThrows(UncheckedExecutionException.class, pipeline::run);
+ assertThat(
+ error.getCause().getMessage(),
+ containsString("Failed to connect to host: redis://1.2.3.4:6379"));
+ }
+}