igorbernstein2 commented on code in PR #29548:
URL: https://github.com/apache/beam/pull/29548#discussion_r1414221675


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -1138,8 +1162,35 @@ public PCollection<BigtableWriteResult> expand(
       bigtableConfig.validate();
       bigtableWriteOptions.validate();
 
+      // Get experimental flag and set on BigtableWriteOptions
+      PipelineOptions pipelineOptions = input.getPipeline().getOptions();
+      String closeWaitTimeoutStr =
+          ExperimentalOptions.getExperimentValue(pipelineOptions, 
BIGTABLE_WRITER_WAIT_TIMEOUT_MS);
+      Duration closeWaitTimeout = Duration.ZERO;
+      if (closeWaitTimeoutStr != null) {
+        try {
+          long closeWaitTimeoutMs = Long.parseLong(closeWaitTimeoutStr);
+          if (closeWaitTimeoutMs < 0) {
+            LOG.warn(
+                "Invalid close wait timeout {}, will not set a wait timeout on 
close",
+                closeWaitTimeoutMs);
+          } else {
+            closeWaitTimeout = Duration.millis(closeWaitTimeoutMs);
+          }
+        } catch (NumberFormatException e) {
+          LOG.warn(

Review Comment:
   this should just throw



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -1195,6 +1246,7 @@ private static class BigtableWriterFn
       this.writeOptions = writeOptions;
       this.failures = new ConcurrentLinkedQueue<>();
       this.id = factory.newId();
+      LOG.info("Created Bigtable Write Fn with writeOptions {} ", 
writeOptions);

Review Comment:
   I'm not sure if this should be info. WriteOptions is an internal class, so 
logging it by default is not useful to endusers. I would drop this to debug



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -119,8 +121,15 @@ class BigtableServiceImpl implements BigtableService {
   private final Duration readOperationTimeout;
 
   @Override
-  public BigtableWriterImpl openForWriting(String tableId) {
-    return new BigtableWriterImpl(client, projectId, instanceId, tableId);
+  public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) {
+    return new BigtableWriterImpl(
+        client,
+        projectId,
+        instanceId,
+        writeOptions.getTableId().get(),
+        writeOptions.getCloseWaitTimeout() != null

Review Comment:
   There seems to be multiple ways to express that close wait timeout should be 
disabled: null & 0. I think you should pick one and stick to it



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -1138,8 +1162,35 @@ public PCollection<BigtableWriteResult> expand(
       bigtableConfig.validate();
       bigtableWriteOptions.validate();
 
+      // Get experimental flag and set on BigtableWriteOptions
+      PipelineOptions pipelineOptions = input.getPipeline().getOptions();
+      String closeWaitTimeoutStr =
+          ExperimentalOptions.getExperimentValue(pipelineOptions, 
BIGTABLE_WRITER_WAIT_TIMEOUT_MS);
+      Duration closeWaitTimeout = Duration.ZERO;
+      if (closeWaitTimeoutStr != null) {
+        try {
+          long closeWaitTimeoutMs = Long.parseLong(closeWaitTimeoutStr);
+          if (closeWaitTimeoutMs < 0) {
+            LOG.warn(
+                "Invalid close wait timeout {}, will not set a wait timeout on 
close",
+                closeWaitTimeoutMs);

Review Comment:
   I think this should just throw



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -470,50 +479,50 @@ static class BigtableWriterImpl implements Writer {
     private String projectId;
     private String instanceId;
     private String tableId;
+    private Duration closeWaitTimeout;
 
     private Distribution bulkSize = Metrics.distribution("BigTable-" + 
tableId, "batchSize");
     private Distribution latency = Metrics.distribution("BigTable-" + tableId, 
"batchLatencyMs");
 
     BigtableWriterImpl(
-        BigtableDataClient client, String projectId, String instanceId, String 
tableId) {
+        BigtableDataClient client,
+        String projectId,
+        String instanceId,
+        String tableId,
+        Duration closeWaitTimeout) {
       this.projectId = projectId;
       this.instanceId = instanceId;
       this.tableId = tableId;
       this.bulkMutation = client.newBulkMutationBatcher(tableId);
-    }
-
-    @Override
-    public void flush() throws IOException {
-      if (bulkMutation != null) {
-        try {
-          stopwatch.start();
-          bulkMutation.flush();
-          bulkSize.update(outstandingMutations);
-          outstandingMutations = 0;
-          stopwatch.stop();
-          latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          // We fail since flush() operation was interrupted.
-          throw new IOException(e);
-        }
-      }
+      this.closeWaitTimeout = closeWaitTimeout;
     }
 
     @Override
     public void close() throws IOException {
       if (bulkMutation != null) {
         try {
           stopwatch.start();
-          bulkMutation.flush();
-          bulkMutation.close();
+          // closeAsync will send any remaining elements in the batch.
+          // If the experimental close wait timeout flag is set,
+          // set a timeout waiting for the future.
+          ApiFuture<Void> future = bulkMutation.closeAsync();
+          if (closeWaitTimeout.isLongerThan(Duration.ZERO)) {
+            future.get(closeWaitTimeout.getMillis(), TimeUnit.MILLISECONDS);
+          } else {
+            future.get();
+          }
           bulkSize.update(outstandingMutations);
           outstandingMutations = 0;
           stopwatch.stop();
           latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+        } catch (TimeoutException | ExecutionException e) {
+          // We fail because future.get() timed out
+          String errorMsg = "BulkMutation took too long to close";

Review Comment:
   Also I would unwrap the ExceptionxecutionExcetion cause



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -470,50 +479,50 @@ static class BigtableWriterImpl implements Writer {
     private String projectId;
     private String instanceId;
     private String tableId;
+    private Duration closeWaitTimeout;
 
     private Distribution bulkSize = Metrics.distribution("BigTable-" + 
tableId, "batchSize");
     private Distribution latency = Metrics.distribution("BigTable-" + tableId, 
"batchLatencyMs");
 
     BigtableWriterImpl(
-        BigtableDataClient client, String projectId, String instanceId, String 
tableId) {
+        BigtableDataClient client,
+        String projectId,
+        String instanceId,
+        String tableId,
+        Duration closeWaitTimeout) {
       this.projectId = projectId;
       this.instanceId = instanceId;
       this.tableId = tableId;
       this.bulkMutation = client.newBulkMutationBatcher(tableId);
-    }
-
-    @Override
-    public void flush() throws IOException {
-      if (bulkMutation != null) {
-        try {
-          stopwatch.start();
-          bulkMutation.flush();
-          bulkSize.update(outstandingMutations);
-          outstandingMutations = 0;
-          stopwatch.stop();
-          latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          // We fail since flush() operation was interrupted.
-          throw new IOException(e);
-        }
-      }
+      this.closeWaitTimeout = closeWaitTimeout;
     }
 
     @Override
     public void close() throws IOException {
       if (bulkMutation != null) {
         try {
           stopwatch.start();
-          bulkMutation.flush();
-          bulkMutation.close();
+          // closeAsync will send any remaining elements in the batch.
+          // If the experimental close wait timeout flag is set,
+          // set a timeout waiting for the future.
+          ApiFuture<Void> future = bulkMutation.closeAsync();
+          if (closeWaitTimeout.isLongerThan(Duration.ZERO)) {
+            future.get(closeWaitTimeout.getMillis(), TimeUnit.MILLISECONDS);
+          } else {
+            future.get();
+          }
           bulkSize.update(outstandingMutations);
           outstandingMutations = 0;
           stopwatch.stop();
           latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+        } catch (TimeoutException | ExecutionException e) {
+          // We fail because future.get() timed out
+          String errorMsg = "BulkMutation took too long to close";

Review Comment:
   This is not true: ExecutionException will be thrown when the future 
operation failed. ie if an element failed, I think it will be wrapped in an 
ExecutionException



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -470,50 +479,50 @@ static class BigtableWriterImpl implements Writer {
     private String projectId;
     private String instanceId;
     private String tableId;
+    private Duration closeWaitTimeout;
 
     private Distribution bulkSize = Metrics.distribution("BigTable-" + 
tableId, "batchSize");
     private Distribution latency = Metrics.distribution("BigTable-" + tableId, 
"batchLatencyMs");
 
     BigtableWriterImpl(
-        BigtableDataClient client, String projectId, String instanceId, String 
tableId) {
+        BigtableDataClient client,
+        String projectId,
+        String instanceId,
+        String tableId,
+        Duration closeWaitTimeout) {
       this.projectId = projectId;
       this.instanceId = instanceId;
       this.tableId = tableId;
       this.bulkMutation = client.newBulkMutationBatcher(tableId);
-    }
-
-    @Override
-    public void flush() throws IOException {
-      if (bulkMutation != null) {
-        try {
-          stopwatch.start();
-          bulkMutation.flush();
-          bulkSize.update(outstandingMutations);
-          outstandingMutations = 0;
-          stopwatch.stop();
-          latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          // We fail since flush() operation was interrupted.
-          throw new IOException(e);
-        }
-      }
+      this.closeWaitTimeout = closeWaitTimeout;
     }
 
     @Override
     public void close() throws IOException {
       if (bulkMutation != null) {
         try {
           stopwatch.start();
-          bulkMutation.flush();
-          bulkMutation.close();
+          // closeAsync will send any remaining elements in the batch.
+          // If the experimental close wait timeout flag is set,
+          // set a timeout waiting for the future.
+          ApiFuture<Void> future = bulkMutation.closeAsync();
+          if (closeWaitTimeout.isLongerThan(Duration.ZERO)) {
+            future.get(closeWaitTimeout.getMillis(), TimeUnit.MILLISECONDS);
+          } else {
+            future.get();
+          }
           bulkSize.update(outstandingMutations);
           outstandingMutations = 0;
           stopwatch.stop();
           latency.update(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+        } catch (TimeoutException | ExecutionException e) {
+          // We fail because future.get() timed out
+          String errorMsg = "BulkMutation took too long to close";
+          LOG.warn(errorMsg, e);
+          throw new IOException(errorMsg, e);

Review Comment:
   why log & throw? I think the exception bubbling up should be enough?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to