This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b566480cec6 fix: have a timeout on the wait when closing 
BigtableIO#Writer (#29548)
b566480cec6 is described below

commit b566480cec6d413efa0d62711b3f3f9ca7a0f6fd
Author: Mattie Fu <[email protected]>
AuthorDate: Wed Dec 6 12:02:41 2023 -0500

    fix: have a timeout on the wait when closing BigtableIO#Writer (#29548)
    
    * fix: have a timeout on the wait when closing BigtableIO#Writer
    
    * refactor
    
    * update code
    
    * address comments
    
    * address comments and fix hang
    
    * put service.close in final block
---
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java       | 81 ++++++++++++++++------
 .../beam/sdk/io/gcp/bigtable/BigtableService.java  |  9 +--
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   | 63 ++++++++++-------
 .../sdk/io/gcp/bigtable/BigtableWriteOptions.java  |  5 ++
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java   | 11 ++-
 .../io/gcp/bigtable/BigtableServiceImplTest.java   | 15 ++--
 6 files changed, 119 insertions(+), 65 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index ad978e95016..82c2d314248 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -60,6 +60,7 @@ import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.estimator.CoderSizeEsti
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -180,6 +181,27 @@ import org.slf4j.LoggerFactory;
  *         .withBatchElements(100)); // every batch will have 100 elements
  * }</pre>
  *
+ * <p>Configure timeout for writes:
+ *
+ * <pre>{@code
+ * // Let each attempt run for 1 second, retry if the attempt failed.
+ * // Give up after the request is retried for 60 seconds.
+ * Duration attemptTimeout = Duration.millis(1000);
+ * Duration operationTimeout = Duration.millis(60 * 1000);
+ * data.apply("write",
+ *     BigtableIO.write()
+ *         .withProjectId("project")
+ *         .withInstanceId("instance")
+ *         .withTableId("table")
+ *         .withAttemptTimeout(attemptTimeout)
+ *         .withOperationTimeout(operationTimeout));
+ * }</pre>
+ *
+ * <p>You can also limit the wait time in the finish bundle step by setting the
+ * bigtable_writer_wait_timeout_ms experimental flag when you run the 
pipeline. For example,
+ * --experiments=bigtable_writer_wait_timeout_ms=60000 will limit the wait 
time in finish bundle to
+ * be 10 minutes.
+ *
  * <p>Optionally, BigtableIO.write() may be configured to emit {@link 
BigtableWriteResult} elements
  * after each group of inputs is written to Bigtable. These can be used to 
then trigger user code
  * after writes have completed. See {@link 
org.apache.beam.sdk.transforms.Wait} for details on the
@@ -1118,6 +1140,8 @@ public class BigtableIO {
       extends PTransform<
           PCollection<KV<ByteString, Iterable<Mutation>>>, 
PCollection<BigtableWriteResult>> {
 
+    private static final String BIGTABLE_WRITER_WAIT_TIMEOUT_MS = 
"bigtable_writer_wait_timeout_ms";
+
     private final BigtableConfig bigtableConfig;
     private final BigtableWriteOptions bigtableWriteOptions;
 
@@ -1138,8 +1162,23 @@ public class BigtableIO {
       bigtableConfig.validate();
       bigtableWriteOptions.validate();
 
+      // Get experimental flag and set on BigtableWriteOptions
+      PipelineOptions pipelineOptions = input.getPipeline().getOptions();
+      String closeWaitTimeoutStr =
+          ExperimentalOptions.getExperimentValue(pipelineOptions, 
BIGTABLE_WRITER_WAIT_TIMEOUT_MS);
+      Duration closeWaitTimeout = null;
+      if (closeWaitTimeoutStr != null) {
+        long closeWaitTimeoutMs = Long.parseLong(closeWaitTimeoutStr);
+        checkState(closeWaitTimeoutMs > 0, "Close wait timeout must be 
positive");
+        closeWaitTimeout = Duration.millis(closeWaitTimeoutMs);
+      }
+
       return input.apply(
-          ParDo.of(new BigtableWriterFn(factory, bigtableConfig, 
bigtableWriteOptions)));
+          ParDo.of(
+              new BigtableWriterFn(
+                  factory,
+                  bigtableConfig,
+                  
bigtableWriteOptions.toBuilder().setCloseWaitTimeout(closeWaitTimeout).build())));
     }
 
     @Override
@@ -1195,6 +1234,7 @@ public class BigtableIO {
       this.writeOptions = writeOptions;
       this.failures = new ConcurrentLinkedQueue<>();
       this.id = factory.newId();
+      LOG.debug("Created Bigtable Write Fn with writeOptions {} ", 
writeOptions);
     }
 
     @StartBundle
@@ -1205,7 +1245,7 @@ public class BigtableIO {
       if (bigtableWriter == null) {
         serviceEntry =
             factory.getServiceForWriting(id, config, writeOptions, 
c.getPipelineOptions());
-        bigtableWriter = 
serviceEntry.getService().openForWriting(writeOptions.getTableId().get());
+        bigtableWriter = 
serviceEntry.getService().openForWriting(writeOptions);
       }
     }
 
@@ -1227,27 +1267,26 @@ public class BigtableIO {
 
     @FinishBundle
     public void finishBundle(FinishBundleContext c) throws Exception {
-      bigtableWriter.flush();
-      checkForFailures();
-      LOG.debug("Wrote {} records", recordsWritten);
+      try {
+        if (bigtableWriter != null) {
+          bigtableWriter.close();
+          bigtableWriter = null;
+        }
 
-      for (Map.Entry<BoundedWindow, Long> entry : seenWindows.entrySet()) {
-        c.output(
-            BigtableWriteResult.create(entry.getValue()),
-            entry.getKey().maxTimestamp(),
-            entry.getKey());
-      }
-    }
+        checkForFailures();
+        LOG.debug("Wrote {} records", recordsWritten);
 
-    @Teardown
-    public void tearDown() throws Exception {
-      if (bigtableWriter != null) {
-        bigtableWriter.close();
-        bigtableWriter = null;
-      }
-      if (serviceEntry != null) {
-        serviceEntry.close();
-        serviceEntry = null;
+        for (Map.Entry<BoundedWindow, Long> entry : seenWindows.entrySet()) {
+          c.output(
+              BigtableWriteResult.create(entry.getValue()),
+              entry.getKey().maxTimestamp(),
+              entry.getKey());
+        }
+      } finally {
+        if (serviceEntry != null) {
+          serviceEntry.close();
+          serviceEntry = null;
+        }
       }
     }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index c0f88331bd6..b529d6530ef 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -45,13 +45,6 @@ interface BigtableService extends Serializable {
     CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, 
Iterable<Mutation>> record)
         throws IOException;
 
-    /**
-     * Flushes the writer.
-     *
-     * @throws IOException if any writes did not succeed
-     */
-    void flush() throws IOException;
-
     /**
      * Closes the writer.
      *
@@ -88,7 +81,7 @@ interface BigtableService extends Serializable {
   Reader createReader(BigtableSource source) throws IOException;
 
   /** Returns a {@link Writer} that will write to the specified table. */
-  Writer openForWriting(String tableId) throws IOException;
+  Writer openForWriting(BigtableWriteOptions writeOptions) throws IOException;
 
   /**
    * Returns a set of row keys sampled from the underlying table. These 
contain information about
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 6e98b7e7866..344229b383f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -19,7 +19,9 @@ package org.apache.beam.sdk.io.gcp.bigtable;
 
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors.directExecutor;
 
+import com.google.api.core.ApiFuture;
 import com.google.api.gax.batching.Batcher;
+import com.google.api.gax.batching.BatchingException;
 import com.google.api.gax.grpc.GrpcCallContext;
 import com.google.api.gax.retrying.RetrySettings;
 import com.google.api.gax.rpc.ResponseObserver;
@@ -61,6 +63,7 @@ import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.runners.core.metrics.ServiceCallMetric;
@@ -106,8 +109,8 @@ class BigtableServiceImpl implements BigtableService {
     RetrySettings retry = 
settings.getStubSettings().readRowsSettings().getRetrySettings();
     this.readAttemptTimeout = 
Duration.millis(retry.getInitialRpcTimeout().toMillis());
     this.readOperationTimeout = 
Duration.millis(retry.getTotalTimeout().toMillis());
-    LOG.info("Started Bigtable service with settings {}", settings);
     this.client = BigtableDataClient.create(settings);
+    LOG.info("Started Bigtable service with settings {}", settings);
   }
 
   private final BigtableDataClient client;
@@ -119,8 +122,13 @@ class BigtableServiceImpl implements BigtableService {
   private final Duration readOperationTimeout;
 
   @Override
-  public BigtableWriterImpl openForWriting(String tableId) {
-    return new BigtableWriterImpl(client, projectId, instanceId, tableId);
+  public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) {
+    return new BigtableWriterImpl(
+        client,
+        projectId,
+        instanceId,
+        writeOptions.getTableId().get(),
+        writeOptions.getCloseWaitTimeout());
   }
 
   @VisibleForTesting
@@ -470,50 +478,55 @@ class BigtableServiceImpl implements BigtableService {
     private String projectId;
     private String instanceId;
     private String tableId;
+    private Duration closeWaitTimeout;
 
     private Distribution bulkSize = Metrics.distribution("BigTable-" + 
tableId, "batchSize");
     private Distribution latency = Metrics.distribution("BigTable-" + tableId, 
"batchLatencyMs");
 
     BigtableWriterImpl(
-        BigtableDataClient client, String projectId, String instanceId, String 
tableId) {
+        BigtableDataClient client,
+        String projectId,
+        String instanceId,
+        String tableId,
+        Duration closeWaitTimeout) {
       this.projectId = projectId;
       this.instanceId = instanceId;
       this.tableId = tableId;
+      this.closeWaitTimeout = closeWaitTimeout;
       this.bulkMutation = client.newBulkMutationBatcher(tableId);
     }
 
-    @Override
-    public void flush() throws IOException {
-      if (bulkMutation != null) {
-        try {
-          stopwatch.start();
-          bulkMutation.flush();
-          bulkSize.update(outstandingMutations);
-          outstandingMutations = 0;
-          stopwatch.stop();
-          latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          // We fail since flush() operation was interrupted.
-          throw new IOException(e);
-        }
-      }
-    }
-
     @Override
     public void close() throws IOException {
       if (bulkMutation != null) {
         try {
           stopwatch.start();
-          bulkMutation.flush();
-          bulkMutation.close();
+          // closeAsync will send any remaining elements in the batch.
+          // If the experimental close wait timeout flag is set,
+          // set a timeout waiting for the future.
+          ApiFuture<Void> future = bulkMutation.closeAsync();
+          if (Duration.ZERO.isShorterThan(closeWaitTimeout)) {
+            future.get(closeWaitTimeout.getMillis(), TimeUnit.MILLISECONDS);
+          } else {
+            future.get();
+          }
           bulkSize.update(outstandingMutations);
           outstandingMutations = 0;
           stopwatch.stop();
           latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+        } catch (BatchingException e) {
+          // Ignore batching failures because element failures are tracked as 
is in
+          // BigtableIOWriteFn.
+          // TODO: Bigtable client already tracks BatchingExceptions, use 
BatchingExceptions
+          // instead of tracking them separately in BigtableIOWriteFn.
+        } catch (TimeoutException e) {
+          // We fail because future.get() timed out
+          throw new IOException("BulkMutation took too long to close", e);
+        } catch (ExecutionException e) {
+          throw new IOException("Failed to close batch", e.getCause());
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
-          // We fail since flush() operation was interrupted.
+          // We fail since close() operation was interrupted.
           throw new IOException(e);
         }
         bulkMutation = null;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
index 05e0e915f12..a63cc575809 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
@@ -60,6 +60,9 @@ abstract class BigtableWriteOptions implements Serializable {
   /** Returns true if batch write flow control is enabled. Otherwise return 
false. */
   abstract @Nullable Boolean getFlowControl();
 
+  /** Returns the time to wait when closing the writer. */
+  abstract @Nullable Duration getCloseWaitTimeout();
+
   abstract Builder toBuilder();
 
   static Builder builder() {
@@ -87,6 +90,8 @@ abstract class BigtableWriteOptions implements Serializable {
 
     abstract Builder setFlowControl(boolean enableFlowControl);
 
+    abstract Builder setCloseWaitTimeout(Duration timeout);
+
     abstract BigtableWriteOptions build();
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 714dc9f8619..6cda518eb12 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -1695,8 +1695,8 @@ public class BigtableIOTest {
     }
 
     @Override
-    public FakeBigtableWriter openForWriting(String tableId) {
-      return new FakeBigtableWriter(tableId);
+    public FakeBigtableWriter openForWriting(BigtableWriteOptions 
writeOptions) {
+      return new FakeBigtableWriter(writeOptions.getTableId().get());
     }
 
     @Override
@@ -1748,8 +1748,8 @@ public class BigtableIOTest {
     }
 
     @Override
-    public FailureBigtableWriter openForWriting(String tableId) {
-      return new FailureBigtableWriter(tableId, this, failureOptions);
+    public FailureBigtableWriter openForWriting(BigtableWriteOptions 
writeOptions) {
+      return new FailureBigtableWriter(writeOptions.getTableId().get(), this, 
failureOptions);
     }
 
     @Override
@@ -1929,9 +1929,6 @@ public class BigtableIOTest {
       return 
CompletableFuture.completedFuture(MutateRowResponse.getDefaultInstance());
     }
 
-    @Override
-    public void flush() {}
-
     @Override
     public void close() {}
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
index f239a68462d..3e2e803bb33 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -747,18 +747,26 @@ public class BigtableServiceImplTest {
    * @throws InterruptedException
    */
   @Test
-  public void testWrite() throws IOException, InterruptedException {
+  public void testWrite() throws IOException {
     
doReturn(mockBatcher).when(mockBigtableDataClient).newBulkMutationBatcher(any());
+    SettableApiFuture<Void> fakeFuture = SettableApiFuture.create();
+    when(mockBatcher.closeAsync()).thenReturn(fakeFuture);
     ArgumentCaptor<RowMutationEntry> captor = 
ArgumentCaptor.forClass(RowMutationEntry.class);
     ApiFuture<Void> fakeResponse = SettableApiFuture.create();
-    
when(mockBatcher.add(any(RowMutationEntry.class))).thenReturn(fakeResponse);
+    when(mockBatcher.add(any(RowMutationEntry.class)))
+        .thenAnswer(
+            invocation -> {
+              fakeFuture.set(null);
+              return fakeResponse;
+            });
 
     BigtableService.Writer underTest =
         new BigtableServiceImpl.BigtableWriterImpl(
             mockBigtableDataClient,
             bigtableDataSettings.getProjectId(),
             bigtableDataSettings.getInstanceId(),
-            TABLE_ID);
+            TABLE_ID,
+            Duration.millis(60000));
 
     ByteString key = ByteString.copyFromUtf8("key");
     Mutation mutation =
@@ -780,7 +788,6 @@ public class BigtableServiceImplTest {
     assertEquals(
         "Family", 
captor.getValue().toProto().getMutations(0).getSetCell().getFamilyName());
     underTest.close();
-    verify(mockBatcher, times(1)).flush();
   }
 
   private void verifyMetricWasSet(String method, String status, long count) {

Reply via email to