Abacn commented on code in PR #29490:
URL: https://github.com/apache/beam/pull/29490#discussion_r1409877629


##########
sdks/java/io/rrio/build.gradle:
##########
@@ -37,6 +37,8 @@ dependencies {
     implementation library.java.jackson_core
     implementation library.java.jackson_databind
     implementation "redis.clients:jedis:$jedisVersion"
+    implementation platform(library.java.google_cloud_platform_libraries_bom)

Review Comment:
   It now depends on google_cloud_platform_libraries_bom because it uses 
`com.google.api.client.util.BackOff` and `ExponentialBackOff`. This use alone 
does not necessarily need to introduce the whole gcp_bom dependency. Consider 
use beam's `org.apache.beam.sdk.util.BackOff`
   
   In general I understand RRIO as generic for API calls and should not need to 
depend on GCP



##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.ExponentialBackOff;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+/** Repeats a method invocation when it encounters an error. */
+public class Repeater<InputT, OutputT> {
+
+  /**
+   * {@link Set} of {@link UserCodeExecutionException}s that warrant 
repeating. A public modifier is
+   * applied to communicate to users of this class which {@link 
UserCodeExecutionException}s
+   * constitute warrant repeat execution.
+   */
+  public static final Set<Class<? extends UserCodeExecutionException>> 
REPEATABLE_ERROR_TYPES =
+      ImmutableSet.of(
+          UserCodeRemoteSystemException.class,
+          UserCodeTimeoutException.class,
+          UserCodeQuotaException.class);
+
+  /** Instantiates a {@link Repeater}. */
+  public static <InputT, OutputT> Repeater<InputT, OutputT> of(
+      ThrowableFunction<InputT, OutputT> throwableFunction, Sleeper sleeper, 
Integer limit) {

Review Comment:
   `of()` with three undocumented parameters, no idea how to assemble this 
public class. Consider use a builder and better document it.



##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.ExponentialBackOff;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+/** Repeats a method invocation when it encounters an error. */
+public class Repeater<InputT, OutputT> {
+
+  /**
+   * {@link Set} of {@link UserCodeExecutionException}s that warrant 
repeating. A public modifier is
+   * applied to communicate to users of this class which {@link 
UserCodeExecutionException}s
+   * constitute warrant repeat execution.
+   */
+  public static final Set<Class<? extends UserCodeExecutionException>> 
REPEATABLE_ERROR_TYPES =
+      ImmutableSet.of(
+          UserCodeRemoteSystemException.class,
+          UserCodeTimeoutException.class,
+          UserCodeQuotaException.class);
+
+  /** Instantiates a {@link Repeater}. */
+  public static <InputT, OutputT> Repeater<InputT, OutputT> of(
+      ThrowableFunction<InputT, OutputT> throwableFunction, Sleeper sleeper, 
Integer limit) {
+    return new Repeater<>(throwableFunction, sleeper, limit);
+  }
+
+  private final ThrowableFunction<InputT, OutputT> throwableFunction;
+
+  private final Sleeper sleeper;
+  private final int limit;
+
+  private Repeater(
+      ThrowableFunction<InputT, OutputT> throwableFunction, Sleeper sleeper, 
int limit) {
+    this.throwableFunction = throwableFunction;
+    this.sleeper = sleeper;
+    this.limit = limit;
+  }
+
+  /**
+   * Applies the {@link InputT} to the {@link ThrowableFunction}. If the 
function throws an
+   * exception that {@link #REPEATABLE_ERROR_TYPES} contains, repeats the 
invocation up to the
+   * limit, otherwise throws the last exception.
+   */
+  public OutputT apply(InputT input) throws UserCodeExecutionException, 
InterruptedException {
+    @MonotonicNonNull UserCodeExecutionException lastException = null;
+    for (int numAttempts = 0; numAttempts < limit; numAttempts++) {
+      try {
+        return throwableFunction.apply(input);
+      } catch (UserCodeExecutionException e) {
+        if (!REPEATABLE_ERROR_TYPES.contains(e.getClass())) {
+          throw e;
+        }
+        lastException = e;
+        sleeper.sleep();
+      }
+    }
+    throw checkStateNotNull(lastException);
+  }
+
+  /**
+   * A {@link FunctionalInterface} for executing a {@link 
UserCodeExecutionException} throwable
+   * function.
+   */
+  @FunctionalInterface
+  public interface ThrowableFunction<InputT, OutputT> {
+    /** Returns the result of invoking this function on the given input. */
+    OutputT apply(InputT input) throws UserCodeExecutionException;
+  }
+
+  /** Interfaces implementation details for pausing an execution. */
+  public interface Sleeper {
+
+    /** Pauses the execution. */
+    void sleep() throws InterruptedException;

Review Comment:
   Should we use `org.apache.beam.sdk.util.Sleeper`? Or, if the next comment is 
considered, one could refactor to take a Backoff instance instead of sleeper.



##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.ExponentialBackOff;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+/** Repeats a method invocation when it encounters an error. */
+public class Repeater<InputT, OutputT> {
+
+  /**
+   * {@link Set} of {@link UserCodeExecutionException}s that warrant 
repeating. A public modifier is
+   * applied to communicate to users of this class which {@link 
UserCodeExecutionException}s
+   * constitute warrant repeat execution.
+   */
+  public static final Set<Class<? extends UserCodeExecutionException>> 
REPEATABLE_ERROR_TYPES =
+      ImmutableSet.of(
+          UserCodeRemoteSystemException.class,
+          UserCodeTimeoutException.class,
+          UserCodeQuotaException.class);
+
+  /** Instantiates a {@link Repeater}. */
+  public static <InputT, OutputT> Repeater<InputT, OutputT> of(
+      ThrowableFunction<InputT, OutputT> throwableFunction, Sleeper sleeper, 
Integer limit) {
+    return new Repeater<>(throwableFunction, sleeper, limit);
+  }
+
+  private final ThrowableFunction<InputT, OutputT> throwableFunction;
+
+  private final Sleeper sleeper;
+  private final int limit;
+
+  private Repeater(
+      ThrowableFunction<InputT, OutputT> throwableFunction, Sleeper sleeper, 
int limit) {
+    this.throwableFunction = throwableFunction;
+    this.sleeper = sleeper;
+    this.limit = limit;
+  }
+
+  /**
+   * Applies the {@link InputT} to the {@link ThrowableFunction}. If the 
function throws an
+   * exception that {@link #REPEATABLE_ERROR_TYPES} contains, repeats the 
invocation up to the
+   * limit, otherwise throws the last exception.
+   */
+  public OutputT apply(InputT input) throws UserCodeExecutionException, 
InterruptedException {
+    @MonotonicNonNull UserCodeExecutionException lastException = null;
+    for (int numAttempts = 0; numAttempts < limit; numAttempts++) {
+      try {
+        return throwableFunction.apply(input);
+      } catch (UserCodeExecutionException e) {
+        if (!REPEATABLE_ERROR_TYPES.contains(e.getClass())) {
+          throw e;
+        }
+        lastException = e;
+        sleeper.sleep();
+      }
+    }
+    throw checkStateNotNull(lastException);
+  }
+
+  /**
+   * A {@link FunctionalInterface} for executing a {@link 
UserCodeExecutionException} throwable
+   * function.
+   */
+  @FunctionalInterface
+  public interface ThrowableFunction<InputT, OutputT> {
+    /** Returns the result of invoking this function on the given input. */
+    OutputT apply(InputT input) throws UserCodeExecutionException;
+  }
+
+  /** Interfaces implementation details for pausing an execution. */
+  public interface Sleeper {
+
+    /** Pauses the execution. */
+    void sleep() throws InterruptedException;
+  }
+
+  /**
+   * A {@link Sleeper} implementation that uses a {@link BackOff} to determine 
how long to pause
+   * execution.
+   */
+  public static class DefaultSleeper implements Sleeper {

Review Comment:
   Similarly, if we use `org.apache.beam.sdk.util.FluentBackoff` then no need 
to implement the DefaultSleeper and no need for gcp_bom dependency



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to