This is an automated email from the ASF dual-hosted git repository. vterentev pushed a commit to branch fix-sqlbigquery in repository https://gitbox.apache.org/repos/asf/beam.git
commit 0e65fa4c5d7fb55b83177ac555d70e2f336cf9eb Author: Vitaly Terentyev <[email protected]> AuthorDate: Fri Jul 11 18:42:32 2025 +0400 Revert #35445 --- .../worker/DataflowBatchWorkerHarness.java | 2 ++ .../StreamingEngineComputationConfigFetcher.java | 3 +++ .../windmill/client/AbstractWindmillStream.java | 11 +++++++++- .../windmill/client/grpc/GrpcWindmillServer.java | 7 +++++-- .../java/org/apache/beam/sdk/util/BackOff.java | 21 +++++++++++-------- .../org/apache/beam/sdk/util/BackOffUtils.java | 12 +++++++---- .../io/aws2/kinesis/RateLimitPolicyFactory.java | 24 ++++++++++++++++------ .../sdk/io/aws2/kinesis/ShardListingUtils.java | 7 ++++--- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 2 +- .../apache/beam/sdk/io/influxdb/InfluxDbIOIT.java | 2 +- .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 3 ++- .../org/apache/beam/io/requestresponse/Call.java | 3 +++ .../apache/beam/io/requestresponse/Repeater.java | 9 ++++++-- .../io/requestresponse/RequestResponseIOTest.java | 5 +++-- 14 files changed, 79 insertions(+), 32 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java index 0afae3ef2da..a1c93cdc578 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java @@ -133,6 +133,8 @@ public class DataflowBatchWorkerHarness { } // Sleeping a while if there is a problem with the work, then go on with the next work. } while (success || BackOffUtils.next(sleeper, backOff)); + } catch (IOException e) { // Failure of BackOff. + LOG.error("Already tried several attempts at working on tasks. Aborting.", e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.error("Interrupted during thread execution or sleep.", e); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 73f2afc0422..5235679f912 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -146,6 +146,9 @@ public final class StreamingEngineComputationConfigFetcher implements Computatio if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { return Optional.empty(); } + } catch (IOException ioe) { + LOG.warn("Error backing off, will not retry: ", ioe); + return Optional.empty(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return Optional.empty(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java index cf46e1f984d..67ef4210d70 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker.windmill.client; import com.google.errorprone.annotations.CanIgnoreReturnValue; +import java.io.IOException; import java.io.PrintWriter; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -255,6 +256,8 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win // Shutdown the stream to clean up any dangling resources and pending requests. shutdown(); break; + } catch (IOException ioe) { + // Keep trying to create the stream. } } } @@ -450,7 +453,11 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win @Override public void onNext(ResponseT response) { - backoff.reset(); + try { + backoff.reset(); + } catch (IOException e) { + // Ignore. + } debugMetrics.recordResponse(); handler.streamDebugMetrics.recordResponse(); handler.onResponse(response); @@ -492,6 +499,8 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; + } catch (IOException e) { + // Ignore. } } recordStreamRestart(status); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index 1cd2d2f5315..62a8f53e7ef 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -22,6 +22,7 @@ import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.localhostChannel; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; import java.util.HashSet; @@ -290,8 +291,10 @@ public final class GrpcWindmillServer extends WindmillServerStub { if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) { throw new WindmillRpcException(e); } - } catch (InterruptedException i) { - Thread.currentThread().interrupt(); + } catch (IOException | InterruptedException i) { + if (i instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } WindmillRpcException rpcException = new WindmillRpcException(e); rpcException.addSuppressed(i); throw rpcException; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java index 130dc15952b..a3ea30bbb81 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java @@ -17,17 +17,20 @@ */ package org.apache.beam.sdk.util; -import org.apache.beam.sdk.annotations.Internal; +import java.io.IOException; -/** Back-off policy when retrying an operation. */ -@Internal +/** + * Back-off policy when retrying an operation. + * + * <p><b>Note</b>: This interface is copied from Google API client library to avoid its dependency. + */ public interface BackOff { /** Indicates that no more retries should be made for use in {@link #nextBackOffMillis()}. */ long STOP = -1L; /** Reset to initial state. */ - void reset(); + void reset() throws IOException; /** * Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to @@ -44,7 +47,7 @@ public interface BackOff { * } * </pre> */ - long nextBackOffMillis(); + long nextBackOffMillis() throws IOException; /** * Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried @@ -54,10 +57,10 @@ public interface BackOff { new BackOff() { @Override - public void reset() {} + public void reset() throws IOException {} @Override - public long nextBackOffMillis() { + public long nextBackOffMillis() throws IOException { return 0; } }; @@ -70,10 +73,10 @@ public interface BackOff { new BackOff() { @Override - public void reset() {} + public void reset() throws IOException {} @Override - public long nextBackOffMillis() { + public long nextBackOffMillis() throws IOException { return STOP; } }; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java index 1bce9d24c46..975cc744d0c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java @@ -17,10 +17,13 @@ */ package org.apache.beam.sdk.util; -import org.apache.beam.sdk.annotations.Internal; +import java.io.IOException; -/** Utilities for {@link BackOff}. */ -@Internal +/** + * Utilities for {@link BackOff}. + * + * <p><b>Note</b>: This is copied from Google API client library to avoid its dependency. + */ public final class BackOffUtils { /** @@ -36,7 +39,8 @@ public final class BackOffUtils { * BackOff#nextBackOffMillis()} did not return {@link BackOff#STOP} * @throws InterruptedException if any thread has interrupted the current thread */ - public static boolean next(Sleeper sleeper, BackOff backOff) throws InterruptedException { + public static boolean next(Sleeper sleeper, BackOff backOff) + throws InterruptedException, IOException { long backOffTime = backOff.nextBackOffMillis(); if (backOffTime == BackOff.STOP) { return false; diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java index 830dfef17f6..34d9562187e 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.aws2.kinesis; +import java.io.IOException; import java.io.Serializable; import java.util.List; import java.util.function.Supplier; @@ -26,6 +27,8 @@ import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implement this interface to create a {@code RateLimitPolicy}. Used to create a rate limiter for @@ -88,6 +91,7 @@ public interface RateLimitPolicyFactory extends Serializable { * response is empty or if the consumer is throttled by AWS. */ class DefaultRateLimiter implements RateLimitPolicy { + private static final Logger LOG = LoggerFactory.getLogger(DefaultRateLimiter.class); private final Sleeper sleeper; private final BackOff emptySuccess; private final BackOff throttled; @@ -118,17 +122,25 @@ public interface RateLimitPolicyFactory extends Serializable { @Override public void onSuccess(List<KinesisRecord> records) throws InterruptedException { - if (records.isEmpty()) { - BackOffUtils.next(sleeper, emptySuccess); - } else { - emptySuccess.reset(); + try { + if (records.isEmpty()) { + BackOffUtils.next(sleeper, emptySuccess); + } else { + emptySuccess.reset(); + } + throttled.reset(); + } catch (IOException e) { + LOG.warn("Error applying onSuccess rate limit policy", e); } - throttled.reset(); } @Override public void onThrottle(KinesisClientThrottledException e) throws InterruptedException { - BackOffUtils.next(sleeper, throttled); + try { + BackOffUtils.next(sleeper, throttled); + } catch (IOException ioe) { + LOG.warn("Error applying onThrottle rate limit policy", e); + } } } } diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java index c064b44a484..cb5f609b9ec 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java @@ -60,7 +60,7 @@ class ShardListingUtils { static ShardFilter buildShardFilterForStartingPoint( KinesisClient kinesisClient, String streamName, StartingPoint startingPoint) - throws InterruptedException { + throws IOException, InterruptedException { InitialPositionInStream position = startingPoint.getPosition(); switch (position) { case LATEST: @@ -78,7 +78,7 @@ class ShardListingUtils { private static ShardFilter buildShardFilterForTimestamp( KinesisClient kinesisClient, String streamName, Instant startingPointTimestamp) - throws InterruptedException { + throws IOException, InterruptedException { StreamDescriptionSummary streamDescription = describeStreamSummary(kinesisClient, streamName); Instant streamCreationTimestamp = TimeUtil.toJoda(streamDescription.streamCreationTimestamp()); @@ -103,7 +103,8 @@ class ShardListingUtils { } private static StreamDescriptionSummary describeStreamSummary( - KinesisClient kinesisClient, final String streamName) throws InterruptedException { + KinesisClient kinesisClient, final String streamName) + throws IOException, InterruptedException { // DescribeStreamSummary has limits that can be hit fairly easily if we are attempting // to configure multiple KinesisIO inputs in the same account. Retry up to // DESCRIBE_STREAM_SUMMARY_MAX_ATTEMPTS times if we end up hitting that limit. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java index dbe689c0575..ae044ae2926 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -293,7 +293,7 @@ class V1TestUtil { } // commit the list of entities to datastore - private void flushBatch() throws DatastoreException, InterruptedException { + private void flushBatch() throws DatastoreException, IOException, InterruptedException { LOG.info("Writing batch of {} entities", entities.size()); Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = diff --git a/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java b/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java index 2d0513f0de2..da725ab541b 100644 --- a/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java +++ b/sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java @@ -95,7 +95,7 @@ public class InfluxDbIOIT { } @Before - public void initTest() throws InterruptedException { + public void initTest() throws IOException, InterruptedException { BackOff backOff = FluentBackoff.DEFAULT.withMaxRetries(4).backoff(); Query createQuery = new Query(String.format("CREATE DATABASE %s", options.getDatabaseName())); try (InfluxDB connection = diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 2c0ad23c563..abc0af81f56 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -26,6 +26,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr import com.google.auto.value.AutoValue; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.IOException; import java.io.Serializable; import java.net.URLClassLoader; import java.sql.Connection; @@ -2843,7 +2844,7 @@ public class JdbcIO { } private void executeBatch(ProcessContext context, Iterable<T> records) - throws SQLException, InterruptedException { + throws SQLException, IOException, InterruptedException { Long startTimeNs = System.nanoTime(); Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = checkStateNotNull(retryBackOff).backoff(); diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java index ab73946534c..f9c1a23e64f 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Call.java @@ -21,6 +21,7 @@ import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; +import java.io.IOException; import java.io.Serializable; import java.util.Optional; import java.util.concurrent.Callable; @@ -352,6 +353,8 @@ class Call<RequestT, ResponseT> extends PTransform<PCollection<RequestT>, Result incIfPresent(sleeperCounter); sleeper.sleep(backOff.nextBackOffMillis()); } catch (InterruptedException ignored) { + } catch (IOException e) { + throw new RuntimeException(e); } } } diff --git a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java index e9a7666d2a1..69364d85887 100644 --- a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java +++ b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Repeater.java @@ -21,6 +21,7 @@ import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; +import java.io.IOException; import java.util.Optional; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.util.BackOff; @@ -118,8 +119,12 @@ abstract class Repeater<InputT, OutputT> { latestError = Optional.of(e); } catch (InterruptedException ignored) { } - incIfPresent(getBackoffCounter()); - waitFor = getBackOff().nextBackOffMillis(); + try { + incIfPresent(getBackoffCounter()); + waitFor = getBackOff().nextBackOffMillis(); + } catch (IOException e) { + throw new UserCodeExecutionException(e); + } } throw latestError.orElse( new UserCodeExecutionException("failed to process for input: " + input)); diff --git a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java index cd0b29bab66..4cbadf23733 100644 --- a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java +++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponse/RequestResponseIOTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import com.google.auto.value.AutoValue; +import java.io.IOException; import java.util.List; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.Coder; @@ -471,10 +472,10 @@ public class RequestResponseIOTest { public BackOff get() { return new BackOff() { @Override - public void reset() {} + public void reset() throws IOException {} @Override - public long nextBackOffMillis() { + public long nextBackOffMillis() throws IOException { counter.inc(); return 0; }
