This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b566480cec6 fix: have a timeout on the wait when closing
BigtableIO#Writer (#29548)
b566480cec6 is described below
commit b566480cec6d413efa0d62711b3f3f9ca7a0f6fd
Author: Mattie Fu <[email protected]>
AuthorDate: Wed Dec 6 12:02:41 2023 -0500
fix: have a timeout on the wait when closing BigtableIO#Writer (#29548)
* fix: have a timeout on the wait when closing BigtableIO#Writer
* refactor
* update code
* address comments
* address comments and fix hang
* put service.close in final block
---
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 81 ++++++++++++++++------
.../beam/sdk/io/gcp/bigtable/BigtableService.java | 9 +--
.../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 63 ++++++++++-------
.../sdk/io/gcp/bigtable/BigtableWriteOptions.java | 5 ++
.../beam/sdk/io/gcp/bigtable/BigtableIOTest.java | 11 ++-
.../io/gcp/bigtable/BigtableServiceImplTest.java | 15 ++--
6 files changed, 119 insertions(+), 65 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index ad978e95016..82c2d314248 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -60,6 +60,7 @@ import
org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEsti
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
@@ -180,6 +181,27 @@ import org.slf4j.LoggerFactory;
* .withBatchElements(100)); // every batch will have 100 elements
* }</pre>
*
+ * <p>Configure timeout for writes:
+ *
+ * <pre>{@code
+ * // Let each attempt run for 1 second, retry if the attempt failed.
+ * // Give up after the request is retried for 60 seconds.
+ * Duration attemptTimeout = Duration.millis(1000);
+ * Duration operationTimeout = Duration.millis(60 * 1000);
+ * data.apply("write",
+ * BigtableIO.write()
+ * .withProjectId("project")
+ * .withInstanceId("instance")
+ * .withTableId("table")
+ * .withAttemptTimeout(attemptTimeout)
+ * .withOperationTimeout(operationTimeout));
+ * }</pre>
+ *
+ * <p>You can also limit the wait time in the finish bundle step by setting the
+ * bigtable_writer_wait_timeout_ms experimental flag when you run the
pipeline. For example,
+ * --experiments=bigtable_writer_wait_timeout_ms=60000 will limit the wait
time in finish bundle to
+ * be 10 minutes.
+ *
* <p>Optionally, BigtableIO.write() may be configured to emit {@link
BigtableWriteResult} elements
* after each group of inputs is written to Bigtable. These can be used to
then trigger user code
* after writes have completed. See {@link
org.apache.beam.sdk.transforms.Wait} for details on the
@@ -1118,6 +1140,8 @@ public class BigtableIO {
extends PTransform<
PCollection<KV<ByteString, Iterable<Mutation>>>,
PCollection<BigtableWriteResult>> {
+ private static final String BIGTABLE_WRITER_WAIT_TIMEOUT_MS =
"bigtable_writer_wait_timeout_ms";
+
private final BigtableConfig bigtableConfig;
private final BigtableWriteOptions bigtableWriteOptions;
@@ -1138,8 +1162,23 @@ public class BigtableIO {
bigtableConfig.validate();
bigtableWriteOptions.validate();
+ // Get experimental flag and set on BigtableWriteOptions
+ PipelineOptions pipelineOptions = input.getPipeline().getOptions();
+ String closeWaitTimeoutStr =
+ ExperimentalOptions.getExperimentValue(pipelineOptions,
BIGTABLE_WRITER_WAIT_TIMEOUT_MS);
+ Duration closeWaitTimeout = null;
+ if (closeWaitTimeoutStr != null) {
+ long closeWaitTimeoutMs = Long.parseLong(closeWaitTimeoutStr);
+ checkState(closeWaitTimeoutMs > 0, "Close wait timeout must be
positive");
+ closeWaitTimeout = Duration.millis(closeWaitTimeoutMs);
+ }
+
return input.apply(
- ParDo.of(new BigtableWriterFn(factory, bigtableConfig,
bigtableWriteOptions)));
+ ParDo.of(
+ new BigtableWriterFn(
+ factory,
+ bigtableConfig,
+
bigtableWriteOptions.toBuilder().setCloseWaitTimeout(closeWaitTimeout).build())));
}
@Override
@@ -1195,6 +1234,7 @@ public class BigtableIO {
this.writeOptions = writeOptions;
this.failures = new ConcurrentLinkedQueue<>();
this.id = factory.newId();
+ LOG.debug("Created Bigtable Write Fn with writeOptions {} ",
writeOptions);
}
@StartBundle
@@ -1205,7 +1245,7 @@ public class BigtableIO {
if (bigtableWriter == null) {
serviceEntry =
factory.getServiceForWriting(id, config, writeOptions,
c.getPipelineOptions());
- bigtableWriter =
serviceEntry.getService().openForWriting(writeOptions.getTableId().get());
+ bigtableWriter =
serviceEntry.getService().openForWriting(writeOptions);
}
}
@@ -1227,27 +1267,26 @@ public class BigtableIO {
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
- bigtableWriter.flush();
- checkForFailures();
- LOG.debug("Wrote {} records", recordsWritten);
+ try {
+ if (bigtableWriter != null) {
+ bigtableWriter.close();
+ bigtableWriter = null;
+ }
- for (Map.Entry<BoundedWindow, Long> entry : seenWindows.entrySet()) {
- c.output(
- BigtableWriteResult.create(entry.getValue()),
- entry.getKey().maxTimestamp(),
- entry.getKey());
- }
- }
+ checkForFailures();
+ LOG.debug("Wrote {} records", recordsWritten);
- @Teardown
- public void tearDown() throws Exception {
- if (bigtableWriter != null) {
- bigtableWriter.close();
- bigtableWriter = null;
- }
- if (serviceEntry != null) {
- serviceEntry.close();
- serviceEntry = null;
+ for (Map.Entry<BoundedWindow, Long> entry : seenWindows.entrySet()) {
+ c.output(
+ BigtableWriteResult.create(entry.getValue()),
+ entry.getKey().maxTimestamp(),
+ entry.getKey());
+ }
+ } finally {
+ if (serviceEntry != null) {
+ serviceEntry.close();
+ serviceEntry = null;
+ }
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index c0f88331bd6..b529d6530ef 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -45,13 +45,6 @@ interface BigtableService extends Serializable {
CompletionStage<MutateRowResponse> writeRecord(KV<ByteString,
Iterable<Mutation>> record)
throws IOException;
- /**
- * Flushes the writer.
- *
- * @throws IOException if any writes did not succeed
- */
- void flush() throws IOException;
-
/**
* Closes the writer.
*
@@ -88,7 +81,7 @@ interface BigtableService extends Serializable {
Reader createReader(BigtableSource source) throws IOException;
/** Returns a {@link Writer} that will write to the specified table. */
- Writer openForWriting(String tableId) throws IOException;
+ Writer openForWriting(BigtableWriteOptions writeOptions) throws IOException;
/**
* Returns a set of row keys sampled from the underlying table. These
contain information about
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 6e98b7e7866..344229b383f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -19,7 +19,9 @@ package org.apache.beam.sdk.io.gcp.bigtable;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.Batcher;
+import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ResponseObserver;
@@ -61,6 +63,7 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
@@ -106,8 +109,8 @@ class BigtableServiceImpl implements BigtableService {
RetrySettings retry =
settings.getStubSettings().readRowsSettings().getRetrySettings();
this.readAttemptTimeout =
Duration.millis(retry.getInitialRpcTimeout().toMillis());
this.readOperationTimeout =
Duration.millis(retry.getTotalTimeout().toMillis());
- LOG.info("Started Bigtable service with settings {}", settings);
this.client = BigtableDataClient.create(settings);
+ LOG.info("Started Bigtable service with settings {}", settings);
}
private final BigtableDataClient client;
@@ -119,8 +122,13 @@ class BigtableServiceImpl implements BigtableService {
private final Duration readOperationTimeout;
@Override
- public BigtableWriterImpl openForWriting(String tableId) {
- return new BigtableWriterImpl(client, projectId, instanceId, tableId);
+ public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) {
+ return new BigtableWriterImpl(
+ client,
+ projectId,
+ instanceId,
+ writeOptions.getTableId().get(),
+ writeOptions.getCloseWaitTimeout());
}
@VisibleForTesting
@@ -470,50 +478,55 @@ class BigtableServiceImpl implements BigtableService {
private String projectId;
private String instanceId;
private String tableId;
+ private Duration closeWaitTimeout;
private Distribution bulkSize = Metrics.distribution("BigTable-" +
tableId, "batchSize");
private Distribution latency = Metrics.distribution("BigTable-" + tableId,
"batchLatencyMs");
BigtableWriterImpl(
- BigtableDataClient client, String projectId, String instanceId, String
tableId) {
+ BigtableDataClient client,
+ String projectId,
+ String instanceId,
+ String tableId,
+ Duration closeWaitTimeout) {
this.projectId = projectId;
this.instanceId = instanceId;
this.tableId = tableId;
+ this.closeWaitTimeout = closeWaitTimeout;
this.bulkMutation = client.newBulkMutationBatcher(tableId);
}
- @Override
- public void flush() throws IOException {
- if (bulkMutation != null) {
- try {
- stopwatch.start();
- bulkMutation.flush();
- bulkSize.update(outstandingMutations);
- outstandingMutations = 0;
- stopwatch.stop();
- latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // We fail since flush() operation was interrupted.
- throw new IOException(e);
- }
- }
- }
-
@Override
public void close() throws IOException {
if (bulkMutation != null) {
try {
stopwatch.start();
- bulkMutation.flush();
- bulkMutation.close();
+ // closeAsync will send any remaining elements in the batch.
+ // If the experimental close wait timeout flag is set,
+ // set a timeout waiting for the future.
+ ApiFuture<Void> future = bulkMutation.closeAsync();
+ if (Duration.ZERO.isShorterThan(closeWaitTimeout)) {
+ future.get(closeWaitTimeout.getMillis(), TimeUnit.MILLISECONDS);
+ } else {
+ future.get();
+ }
bulkSize.update(outstandingMutations);
outstandingMutations = 0;
stopwatch.stop();
latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ } catch (BatchingException e) {
+ // Ignore batching failures because element failures are tracked as
is in
+ // BigtableIOWriteFn.
+ // TODO: Bigtable client already tracks BatchingExceptions, use
BatchingExceptions
+ // instead of tracking them separately in BigtableIOWriteFn.
+ } catch (TimeoutException e) {
+ // We fail because future.get() timed out
+ throw new IOException("BulkMutation took too long to close", e);
+ } catch (ExecutionException e) {
+ throw new IOException("Failed to close batch", e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- // We fail since flush() operation was interrupted.
+ // We fail since close() operation was interrupted.
throw new IOException(e);
}
bulkMutation = null;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
index 05e0e915f12..a63cc575809 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
@@ -60,6 +60,9 @@ abstract class BigtableWriteOptions implements Serializable {
/** Returns true if batch write flow control is enabled. Otherwise return
false. */
abstract @Nullable Boolean getFlowControl();
+ /** Returns the time to wait when closing the writer. */
+ abstract @Nullable Duration getCloseWaitTimeout();
+
abstract Builder toBuilder();
static Builder builder() {
@@ -87,6 +90,8 @@ abstract class BigtableWriteOptions implements Serializable {
abstract Builder setFlowControl(boolean enableFlowControl);
+ abstract Builder setCloseWaitTimeout(Duration timeout);
+
abstract BigtableWriteOptions build();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 714dc9f8619..6cda518eb12 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -1695,8 +1695,8 @@ public class BigtableIOTest {
}
@Override
- public FakeBigtableWriter openForWriting(String tableId) {
- return new FakeBigtableWriter(tableId);
+ public FakeBigtableWriter openForWriting(BigtableWriteOptions
writeOptions) {
+ return new FakeBigtableWriter(writeOptions.getTableId().get());
}
@Override
@@ -1748,8 +1748,8 @@ public class BigtableIOTest {
}
@Override
- public FailureBigtableWriter openForWriting(String tableId) {
- return new FailureBigtableWriter(tableId, this, failureOptions);
+ public FailureBigtableWriter openForWriting(BigtableWriteOptions
writeOptions) {
+ return new FailureBigtableWriter(writeOptions.getTableId().get(), this,
failureOptions);
}
@Override
@@ -1929,9 +1929,6 @@ public class BigtableIOTest {
return
CompletableFuture.completedFuture(MutateRowResponse.getDefaultInstance());
}
- @Override
- public void flush() {}
-
@Override
public void close() {}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
index f239a68462d..3e2e803bb33 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -747,18 +747,26 @@ public class BigtableServiceImplTest {
* @throws InterruptedException
*/
@Test
- public void testWrite() throws IOException, InterruptedException {
+ public void testWrite() throws IOException {
doReturn(mockBatcher).when(mockBigtableDataClient).newBulkMutationBatcher(any());
+ SettableApiFuture<Void> fakeFuture = SettableApiFuture.create();
+ when(mockBatcher.closeAsync()).thenReturn(fakeFuture);
ArgumentCaptor<RowMutationEntry> captor =
ArgumentCaptor.forClass(RowMutationEntry.class);
ApiFuture<Void> fakeResponse = SettableApiFuture.create();
-
when(mockBatcher.add(any(RowMutationEntry.class))).thenReturn(fakeResponse);
+ when(mockBatcher.add(any(RowMutationEntry.class)))
+ .thenAnswer(
+ invocation -> {
+ fakeFuture.set(null);
+ return fakeResponse;
+ });
BigtableService.Writer underTest =
new BigtableServiceImpl.BigtableWriterImpl(
mockBigtableDataClient,
bigtableDataSettings.getProjectId(),
bigtableDataSettings.getInstanceId(),
- TABLE_ID);
+ TABLE_ID,
+ Duration.millis(60000));
ByteString key = ByteString.copyFromUtf8("key");
Mutation mutation =
@@ -780,7 +788,6 @@ public class BigtableServiceImplTest {
assertEquals(
"Family",
captor.getValue().toProto().getMutations(0).getSetCell().getFamilyName());
underTest.close();
- verify(mockBatcher, times(1)).flush();
}
private void verifyMetricWasSet(String method, String status, long count) {