This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 104c10b3ee5 [RRIO] Create Caller and SetupTeardown interfaces (#28905)
104c10b3ee5 is described below
commit 104c10b3ee536a9a3ea52b4dbf62d86b669da5d9
Author: Damon <[email protected]>
AuthorDate: Wed Oct 11 12:39:38 2023 -0700
[RRIO] Create Caller and SetupTeardown interfaces (#28905)
* Create test Caller and SetupTeardown interfaces
* Update Javadoc
* Defer Call transform to future PR
* Rename package to requestresponseio
* Add username to TODO
---
sdks/java/io/rrio/build.gradle | 7 +-
.../apache/beam/io/requestresponseio/Caller.java | 27 +++++
.../beam/io/requestresponseio/SetupTeardown.java | 34 ++++++
.../UserCodeExecutionException.java | 38 ++++++
.../requestresponseio/UserCodeQuotaException.java | 42 +++++++
.../UserCodeTimeoutException.java | 39 ++++++
.../beam/io/requestresponseio/package-info.java | 20 ++++
.../java/org/apache/beam/io/rrio/CallerTest.java | 126 ++++++++++++++++++++
.../org/apache/beam/io/rrio/SetupTeardownTest.java | 132 +++++++++++++++++++++
9 files changed, 462 insertions(+), 3 deletions(-)
diff --git a/sdks/java/io/rrio/build.gradle b/sdks/java/io/rrio/build.gradle
index d65df370e0c..32fbd9d22e3 100644
--- a/sdks/java/io/rrio/build.gradle
+++ b/sdks/java/io/rrio/build.gradle
@@ -25,9 +25,10 @@ description = "Apache Beam :: SDKS :: Java :: IO ::
RequestResponseIO (RRIO)"
ext.summary = "Support to read from and write to Web APIs"
dependencies {
- implementation project(path: ":sdks:java:core", configuration: "shadow")
- implementation library.java.joda_time
- implementation library.java.vendored_guava_32_1_2_jre
+ // TODO(damondouglas): revert to implementation after project is more
fully developed
+ permitUnusedDeclared project(path: ":sdks:java:core", configuration:
"shadow")
+ permitUnusedDeclared library.java.joda_time
+ permitUnusedDeclared library.java.vendored_guava_32_1_2_jre
testImplementation project(path: ":sdks:java:core", configuration:
"shadowTest")
testImplementation library.java.junit
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/Caller.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/Caller.java
new file mode 100644
index 00000000000..32b514c43a1
--- /dev/null
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/Caller.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+import java.io.Serializable;
+
+/** {@link Caller} interfaces user custom code intended for API calls. */
+public interface Caller<RequestT, ResponseT> extends Serializable {
+
+ /** Calls a Web API with the {@link RequestT} and returns a {@link
ResponseT}. */
+ ResponseT call(RequestT request) throws UserCodeExecutionException;
+}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/SetupTeardown.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/SetupTeardown.java
new file mode 100644
index 00000000000..2bdc8113d98
--- /dev/null
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/SetupTeardown.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+import java.io.Serializable;
+
+/**
+ * Provided by user and called within {@link
org.apache.beam.sdk.transforms.DoFn.Setup} and @{link
+ * org.apache.beam.sdk.transforms.DoFn.Teardown} lifecycle methods of {@link
Call}'s {@link
+ * org.apache.beam.sdk.transforms.DoFn}.
+ */
+public interface SetupTeardown extends Serializable {
+
+ /** Called during the {@link org.apache.beam.sdk.transforms.DoFn}'s setup
lifecycle method. */
+ void setup() throws UserCodeExecutionException;
+
+ /** Called during the {@link org.apache.beam.sdk.transforms.DoFn}'s teardown
lifecycle method. */
+ void teardown() throws UserCodeExecutionException;
+}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeExecutionException.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeExecutionException.java
new file mode 100644
index 00000000000..3a4c002f52e
--- /dev/null
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeExecutionException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+/** Base {@link Exception} for signaling errors in user custom code. */
+public class UserCodeExecutionException extends Exception {
+ public UserCodeExecutionException(String message) {
+ super(message);
+ }
+
+ public UserCodeExecutionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public UserCodeExecutionException(Throwable cause) {
+ super(cause);
+ }
+
+ public UserCodeExecutionException(
+ String message, Throwable cause, boolean enableSuppression, boolean
writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeQuotaException.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeQuotaException.java
new file mode 100644
index 00000000000..f16f078927f
--- /dev/null
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeQuotaException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+/**
+ * Extends {@link UserCodeQuotaException} to allow the user custom code to
specifically signal a
+ * Quota or API overuse related error.
+ */
+public class UserCodeQuotaException extends UserCodeExecutionException {
+
+ public UserCodeQuotaException(String message) {
+ super(message);
+ }
+
+ public UserCodeQuotaException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public UserCodeQuotaException(Throwable cause) {
+ super(cause);
+ }
+
+ public UserCodeQuotaException(
+ String message, Throwable cause, boolean enableSuppression, boolean
writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeTimeoutException.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeTimeoutException.java
new file mode 100644
index 00000000000..22b06744985
--- /dev/null
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeTimeoutException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+/** An extension of {@link UserCodeQuotaException} to specifically signal a
user code timeout. */
+public class UserCodeTimeoutException extends UserCodeExecutionException {
+
+ public UserCodeTimeoutException(String message) {
+ super(message);
+ }
+
+ public UserCodeTimeoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public UserCodeTimeoutException(Throwable cause) {
+ super(cause);
+ }
+
+ public UserCodeTimeoutException(
+ String message, Throwable cause, boolean enableSuppression, boolean
writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/package-info.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/package-info.java
new file mode 100644
index 00000000000..cd9c11c13f8
--- /dev/null
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Package provides Beam I/O transform support for safely reading from and
writing to Web APIs. */
+package org.apache.beam.io.requestresponseio;
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java
new file mode 100644
index 00000000000..5258573f428
--- /dev/null
+++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.rrio;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.apache.beam.io.requestresponseio.Caller;
+import org.apache.beam.io.requestresponseio.UserCodeExecutionException;
+import org.apache.beam.io.requestresponseio.UserCodeQuotaException;
+import org.apache.beam.io.requestresponseio.UserCodeTimeoutException;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Caller}. */
+@RunWith(JUnit4.class)
+public class CallerTest {
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void canSerializeImplementingClasses() {
+ SerializableUtils.serializeToByteArray(new CallerImpl());
+ }
+
+ @Test
+ public void canSerializeWhenUsedInDoFn() {
+ pipeline
+ .apply(Create.of(Instant.now()))
+ .apply(ParDo.of(new CallerUsingDoFn<>(new CallerImpl())))
+ .setCoder(StringUtf8Coder.of());
+
+ pipeline.run();
+ }
+
+ @Test
+ public void canSignalQuotaException() {
+ pipeline
+ .apply(Create.of(1))
+ .apply(ParDo.of(new CallerUsingDoFn<>(new
CallerThrowsQuotaException())))
+ .setCoder(VarIntCoder.of());
+
+ PipelineExecutionException executionException =
+ assertThrows(PipelineExecutionException.class, pipeline::run);
+ assertEquals(UserCodeQuotaException.class,
executionException.getCause().getClass());
+ }
+
+ @Test
+ public void canSignalTimeoutException() {
+ pipeline
+ .apply(Create.of(1))
+ .apply(ParDo.of(new CallerUsingDoFn<>(new
CallerThrowsTimeoutException())))
+ .setCoder(VarIntCoder.of());
+
+ PipelineExecutionException executionException =
+ assertThrows(PipelineExecutionException.class, pipeline::run);
+ assertEquals(UserCodeTimeoutException.class,
executionException.getCause().getClass());
+ }
+
+ private static class CallerUsingDoFn<RequestT, ResponseT> extends
DoFn<RequestT, ResponseT> {
+ private final Caller<RequestT, ResponseT> caller;
+
+ private CallerUsingDoFn(Caller<RequestT, ResponseT> caller) {
+ this.caller = caller;
+ }
+
+ @ProcessElement
+ public void process(@Element RequestT request, OutputReceiver<ResponseT>
receiver)
+ throws UserCodeExecutionException {
+ RequestT safeRequest = checkStateNotNull(request);
+ ResponseT response = caller.call(safeRequest);
+ receiver.output(response);
+ }
+ }
+
+ private static class CallerImpl implements Caller<Instant, String> {
+
+ @Override
+ public String call(Instant request) throws UserCodeExecutionException {
+ return request.toString();
+ }
+ }
+
+ private static class CallerThrowsQuotaException implements Caller<Integer,
Integer> {
+
+ @Override
+ public Integer call(Integer request) throws UserCodeExecutionException {
+ throw new UserCodeQuotaException("quota");
+ }
+ }
+
+ private static class CallerThrowsTimeoutException implements Caller<Integer,
Integer> {
+
+ @Override
+ public Integer call(Integer request) throws UserCodeExecutionException {
+ throw new UserCodeTimeoutException("timeout");
+ }
+ }
+}
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java
new file mode 100644
index 00000000000..a8c5c45ede5
--- /dev/null
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.rrio;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.apache.beam.io.requestresponseio.SetupTeardown;
+import org.apache.beam.io.requestresponseio.UserCodeExecutionException;
+import org.apache.beam.io.requestresponseio.UserCodeQuotaException;
+import org.apache.beam.io.requestresponseio.UserCodeTimeoutException;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.UserCodeException;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class SetupTeardownTest {
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void canSerializeImplementingClasses() {
+ SerializableUtils.serializeToByteArray(new SetupTeardownImpl());
+ }
+
+ @Test
+ public void canSerializeWhenUsedInDoFn() {
+ pipeline
+ .apply(Create.of(1))
+ .apply(ParDo.of(new SetupTeardownUsingDoFn(new SetupTeardownImpl())))
+ .setCoder(VarIntCoder.of());
+
+ pipeline.run();
+ }
+
+ @Test
+ public void canSignalQuotaException() {
+ pipeline
+ .apply(Create.of(1))
+ .apply(ParDo.of(new SetupTeardownUsingDoFn(new
ThrowsQuotaException())))
+ .setCoder(VarIntCoder.of());
+
+ UncheckedExecutionException exception =
+ assertThrows(UncheckedExecutionException.class, pipeline::run);
+ UserCodeException userCodeException = (UserCodeException)
exception.getCause();
+ assertEquals(UserCodeQuotaException.class,
userCodeException.getCause().getClass());
+ }
+
+ @Test
+ public void canSignalTimeoutException() {
+ pipeline
+ .apply(Create.of(1))
+ .apply(ParDo.of(new SetupTeardownUsingDoFn(new
ThrowsTimeoutException())))
+ .setCoder(VarIntCoder.of());
+
+ UncheckedExecutionException exception =
+ assertThrows(UncheckedExecutionException.class, pipeline::run);
+ UserCodeException userCodeException = (UserCodeException)
exception.getCause();
+ assertEquals(UserCodeTimeoutException.class,
userCodeException.getCause().getClass());
+ }
+
+ private static class SetupTeardownUsingDoFn extends DoFn<Integer, Integer> {
+ private final SetupTeardown setupTeardown;
+
+ private SetupTeardownUsingDoFn(SetupTeardown setupTeardown) {
+ this.setupTeardown = setupTeardown;
+ }
+
+ @Setup
+ public void setup() throws UserCodeExecutionException {
+ setupTeardown.setup();
+ }
+
+ @Teardown
+ public void teardown() throws UserCodeExecutionException {
+ setupTeardown.teardown();
+ }
+
+ @ProcessElement
+ public void process() {}
+ }
+
+ private static class SetupTeardownImpl implements SetupTeardown {
+ @Override
+ public void setup() throws UserCodeExecutionException {}
+
+ @Override
+ public void teardown() throws UserCodeExecutionException {}
+ }
+
+ private static class ThrowsQuotaException implements SetupTeardown {
+
+ @Override
+ public void setup() throws UserCodeExecutionException {
+ throw new UserCodeQuotaException("quota");
+ }
+
+ @Override
+ public void teardown() throws UserCodeExecutionException {}
+ }
+
+ private static class ThrowsTimeoutException implements SetupTeardown {
+
+ @Override
+ public void setup() throws UserCodeExecutionException {
+ throw new UserCodeTimeoutException("timeout");
+ }
+
+ @Override
+ public void teardown() throws UserCodeExecutionException {}
+ }
+}