This is an automated email from the ASF dual-hosted git repository.

vterentev pushed a commit to branch fix-sqlbigquery
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0e65fa4c5d7fb55b83177ac555d70e2f336cf9eb
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Fri Jul 11 18:42:32 2025 +0400

    Revert #35445
---
 .../worker/DataflowBatchWorkerHarness.java         |  2 ++
 .../StreamingEngineComputationConfigFetcher.java   |  3 +++
 .../windmill/client/AbstractWindmillStream.java    | 11 +++++++++-
 .../windmill/client/grpc/GrpcWindmillServer.java   |  7 +++++--
 .../java/org/apache/beam/sdk/util/BackOff.java     | 21 +++++++++++--------
 .../org/apache/beam/sdk/util/BackOffUtils.java     | 12 +++++++----
 .../io/aws2/kinesis/RateLimitPolicyFactory.java    | 24 ++++++++++++++++------
 .../sdk/io/aws2/kinesis/ShardListingUtils.java     |  7 ++++---
 .../beam/sdk/io/gcp/datastore/V1TestUtil.java      |  2 +-
 .../apache/beam/sdk/io/influxdb/InfluxDbIOIT.java  |  2 +-
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   |  3 ++-
 .../org/apache/beam/io/requestresponse/Call.java   |  3 +++
 .../apache/beam/io/requestresponse/Repeater.java   |  9 ++++++--
 .../io/requestresponse/RequestResponseIOTest.java  |  5 +++--
 14 files changed, 79 insertions(+), 32 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
index 0afae3ef2da..a1c93cdc578 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
@@ -133,6 +133,8 @@ public class DataflowBatchWorkerHarness {
           }
           // Sleeping a while if there is a problem with the work, then go on 
with the next work.
         } while (success || BackOffUtils.next(sleeper, backOff));
+      } catch (IOException e) { // Failure of BackOff.
+        LOG.error("Already tried several attempts at working on tasks. 
Aborting.", e);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         LOG.error("Interrupted during thread execution or sleep.", e);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
index 73f2afc0422..5235679f912 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
@@ -146,6 +146,9 @@ public final class StreamingEngineComputationConfigFetcher 
implements Computatio
           if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
             return Optional.empty();
           }
+        } catch (IOException ioe) {
+          LOG.warn("Error backing off, will not retry: ", ioe);
+          return Optional.empty();
         } catch (InterruptedException ie) {
           Thread.currentThread().interrupt();
           return Optional.empty();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
index cf46e1f984d..67ef4210d70 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.worker.windmill.client;
 
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -255,6 +256,8 @@ public abstract class AbstractWindmillStream<RequestT, 
ResponseT> implements Win
           // Shutdown the stream to clean up any dangling resources and 
pending requests.
           shutdown();
           break;
+        } catch (IOException ioe) {
+          // Keep trying to create the stream.
         }
       }
     }
@@ -450,7 +453,11 @@ public abstract class AbstractWindmillStream<RequestT, 
ResponseT> implements Win
 
     @Override
     public void onNext(ResponseT response) {
-      backoff.reset();
+      try {
+        backoff.reset();
+      } catch (IOException e) {
+        // Ignore.
+      }
       debugMetrics.recordResponse();
       handler.streamDebugMetrics.recordResponse();
       handler.onResponse(response);
@@ -492,6 +499,8 @@ public abstract class AbstractWindmillStream<RequestT, 
ResponseT> implements Win
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         return;
+      } catch (IOException e) {
+        // Ignore.
       }
     }
     recordStreamRestart(status);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index 1cd2d2f5315..62a8f53e7ef 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -22,6 +22,7 @@ import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs
 import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.localhostChannel;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -290,8 +291,10 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
           if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
             throw new WindmillRpcException(e);
           }
-        } catch (InterruptedException i) {
-          Thread.currentThread().interrupt();
+        } catch (IOException | InterruptedException i) {
+          if (i instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+          }
           WindmillRpcException rpcException = new WindmillRpcException(e);
           rpcException.addSuppressed(i);
           throw rpcException;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java
index 130dc15952b..a3ea30bbb81 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java
@@ -17,17 +17,20 @@
  */
 package org.apache.beam.sdk.util;
 
-import org.apache.beam.sdk.annotations.Internal;
+import java.io.IOException;
 
-/** Back-off policy when retrying an operation. */
-@Internal
+/**
+ * Back-off policy when retrying an operation.
+ *
+ * <p><b>Note</b>: This interface is copied from Google API client library to 
avoid its dependency.
+ */
 public interface BackOff {
 
   /** Indicates that no more retries should be made for use in {@link 
#nextBackOffMillis()}. */
   long STOP = -1L;
 
   /** Reset to initial state. */
-  void reset();
+  void reset() throws IOException;
 
   /**
    * Gets the number of milliseconds to wait before retrying the operation or 
{@link #STOP} to
@@ -44,7 +47,7 @@ public interface BackOff {
    * }
    * </pre>
    */
-  long nextBackOffMillis();
+  long nextBackOffMillis() throws IOException;
 
   /**
    * Fixed back-off policy whose back-off time is always zero, meaning that 
the operation is retried
@@ -54,10 +57,10 @@ public interface BackOff {
       new BackOff() {
 
         @Override
-        public void reset() {}
+        public void reset() throws IOException {}
 
         @Override
-        public long nextBackOffMillis() {
+        public long nextBackOffMillis() throws IOException {
           return 0;
         }
       };
@@ -70,10 +73,10 @@ public interface BackOff {
       new BackOff() {
 
         @Override
-        public void reset() {}
+        public void reset() throws IOException {}
 
         @Override
-        public long nextBackOffMillis() {
+        public long nextBackOffMillis() throws IOException {
           return STOP;
         }
       };
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java
index 1bce9d24c46..975cc744d0c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.util;
 
-import org.apache.beam.sdk.annotations.Internal;
+import java.io.IOException;
 
-/** Utilities for {@link BackOff}. */
-@Internal
+/**
+ * Utilities for {@link BackOff}.
+ *
+ * <p><b>Note</b>: This is copied from Google API client library to avoid its 
dependency.
+ */
 public final class BackOffUtils {
 
   /**
@@ -36,7 +39,8 @@ public final class BackOffUtils {
    *     BackOff#nextBackOffMillis()} did not return {@link BackOff#STOP}
    * @throws InterruptedException if any thread has interrupted the current 
thread
    */
-  public static boolean next(Sleeper sleeper, BackOff backOff) throws 
InterruptedException {
+  public static boolean next(Sleeper sleeper, BackOff backOff)
+      throws InterruptedException, IOException {
     long backOffTime = backOff.nextBackOffMillis();
     if (backOffTime == BackOff.STOP) {
       return false;
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java
index 830dfef17f6..34d9562187e 100644
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java
+++ 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.aws2.kinesis;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 import java.util.function.Supplier;
@@ -26,6 +27,8 @@ import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.Sleeper;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implement this interface to create a {@code RateLimitPolicy}. Used to 
create a rate limiter for
@@ -88,6 +91,7 @@ public interface RateLimitPolicyFactory extends Serializable {
    * response is empty or if the consumer is throttled by AWS.
    */
   class DefaultRateLimiter implements RateLimitPolicy {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultRateLimiter.class);
     private final Sleeper sleeper;
     private final BackOff emptySuccess;
     private final BackOff throttled;
@@ -118,17 +122,25 @@ public interface RateLimitPolicyFactory extends 
Serializable {
 
     @Override
     public void onSuccess(List<KinesisRecord> records) throws 
InterruptedException {
-      if (records.isEmpty()) {
-        BackOffUtils.next(sleeper, emptySuccess);
-      } else {
-        emptySuccess.reset();
+      try {
+        if (records.isEmpty()) {
+          BackOffUtils.next(sleeper, emptySuccess);
+        } else {
+          emptySuccess.reset();
+        }
+        throttled.reset();
+      } catch (IOException e) {
+        LOG.warn("Error applying onSuccess rate limit policy", e);
       }
-      throttled.reset();
     }
 
     @Override
     public void onThrottle(KinesisClientThrottledException e) throws 
InterruptedException {
-      BackOffUtils.next(sleeper, throttled);
+      try {
+        BackOffUtils.next(sleeper, throttled);
+      } catch (IOException ioe) {
+        LOG.warn("Error applying onThrottle rate limit policy", e);
+      }
     }
   }
 }
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java
index c064b44a484..cb5f609b9ec 100644
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java
+++ 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java
@@ -60,7 +60,7 @@ class ShardListingUtils {
 
   static ShardFilter buildShardFilterForStartingPoint(
       KinesisClient kinesisClient, String streamName, StartingPoint 
startingPoint)
-      throws InterruptedException {
+      throws IOException, InterruptedException {
     InitialPositionInStream position = startingPoint.getPosition();
     switch (position) {
       case LATEST:
@@ -78,7 +78,7 @@ class ShardListingUtils {
 
   private static ShardFilter buildShardFilterForTimestamp(
       KinesisClient kinesisClient, String streamName, Instant 
startingPointTimestamp)
-      throws InterruptedException {
+      throws IOException, InterruptedException {
     StreamDescriptionSummary streamDescription = 
describeStreamSummary(kinesisClient, streamName);
 
     Instant streamCreationTimestamp = 
TimeUtil.toJoda(streamDescription.streamCreationTimestamp());
@@ -103,7 +103,8 @@ class ShardListingUtils {
   }
 
   private static StreamDescriptionSummary describeStreamSummary(
-      KinesisClient kinesisClient, final String streamName) throws 
InterruptedException {
+      KinesisClient kinesisClient, final String streamName)
+      throws IOException, InterruptedException {
     // DescribeStreamSummary has limits that can be hit fairly easily if we 
are attempting
     // to configure multiple KinesisIO inputs in the same account. Retry up to
     // DESCRIBE_STREAM_SUMMARY_MAX_ATTEMPTS times if we end up hitting that 
limit.
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
index dbe689c0575..ae044ae2926 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -293,7 +293,7 @@ class V1TestUtil {
     }
 
     // commit the list of entities to datastore
-    private void flushBatch() throws DatastoreException, InterruptedException {
+    private void flushBatch() throws DatastoreException, IOException, 
InterruptedException {
       LOG.info("Writing batch of {} entities", entities.size());
       Sleeper sleeper = Sleeper.DEFAULT;
       BackOff backoff =
diff --git 
a/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java
 
b/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java
index 2d0513f0de2..da725ab541b 100644
--- 
a/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java
+++ 
b/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java
@@ -95,7 +95,7 @@ public class InfluxDbIOIT {
   }
 
   @Before
-  public void initTest() throws InterruptedException {
+  public void initTest() throws IOException, InterruptedException {
     BackOff backOff = FluentBackoff.DEFAULT.withMaxRetries(4).backoff();
     Query createQuery = new Query(String.format("CREATE DATABASE %s", 
options.getDatabaseName()));
     try (InfluxDB connection =
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 2c0ad23c563..abc0af81f56 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -26,6 +26,7 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 
 import com.google.auto.value.AutoValue;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
 import java.io.Serializable;
 import java.net.URLClassLoader;
 import java.sql.Connection;
@@ -2843,7 +2844,7 @@ public class JdbcIO {
     }
 
     private void executeBatch(ProcessContext context, Iterable<T> records)
-        throws SQLException, InterruptedException {
+        throws SQLException, IOException, InterruptedException {
       Long startTimeNs = System.nanoTime();
       Sleeper sleeper = Sleeper.DEFAULT;
       BackOff backoff = checkStateNotNull(retryBackOff).backoff();
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
index ab73946534c..f9c1a23e64f 100644
--- 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 
 import com.google.auto.value.AutoValue;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Optional;
 import java.util.concurrent.Callable;
@@ -352,6 +353,8 @@ class Call<RequestT, ResponseT> extends 
PTransform<PCollection<RequestT>, Result
           incIfPresent(sleeperCounter);
           sleeper.sleep(backOff.nextBackOffMillis());
         } catch (InterruptedException ignored) {
+        } catch (IOException e) {
+          throw new RuntimeException(e);
         }
       }
     }
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java
index e9a7666d2a1..69364d85887 100644
--- 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
 import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 
 import com.google.auto.value.AutoValue;
+import java.io.IOException;
 import java.util.Optional;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.util.BackOff;
@@ -118,8 +119,12 @@ abstract class Repeater<InputT, OutputT> {
         latestError = Optional.of(e);
       } catch (InterruptedException ignored) {
       }
-      incIfPresent(getBackoffCounter());
-      waitFor = getBackOff().nextBackOffMillis();
+      try {
+        incIfPresent(getBackoffCounter());
+        waitFor = getBackOff().nextBackOffMillis();
+      } catch (IOException e) {
+        throw new UserCodeExecutionException(e);
+      }
     }
     throw latestError.orElse(
         new UserCodeExecutionException("failed to process for input: " + 
input));
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
index cd0b29bab66..4cbadf23733 100644
--- 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 
 import com.google.auto.value.AutoValue;
+import java.io.IOException;
 import java.util.List;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.Coder;
@@ -471,10 +472,10 @@ public class RequestResponseIOTest {
     public BackOff get() {
       return new BackOff() {
         @Override
-        public void reset() {}
+        public void reset() throws IOException {}
 
         @Override
-        public long nextBackOffMillis() {
+        public long nextBackOffMillis() throws IOException {
           counter.inc();
           return 0;
         }

Reply via email to