This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch revert-30172-old_code
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2005a53766a1adbda75c6d03c08461391f05bbbc
Author: tvalentyn <[email protected]>
AuthorDate: Tue May 28 10:51:44 2024 -0700

    Revert "Clean up stale code in BigtableService (#30172)"
    
    This reverts commit 5c30b1d6db27fff299fc28249f9f195d54651abd.
---
 .../beam/sdk/io/gcp/bigtable/BigtableService.java  |  7 ++
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   | 78 ++++++++++++++++++++--
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java   | 10 +++
 .../io/gcp/bigtable/BigtableServiceImplTest.java   | 43 +++++++++++-
 4 files changed, 131 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index 1e3839b5df4..3a3de5622cd 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -30,6 +30,7 @@ import java.util.NoSuchElementException;
 import java.util.concurrent.CompletionStage;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
 import org.apache.beam.sdk.values.KV;
+import org.joda.time.Duration;
 
 /** An interface for real or fake implementations of Cloud Bigtable. */
 interface BigtableService extends Serializable {
@@ -75,6 +76,12 @@ interface BigtableService extends Serializable {
      * current row because the last such call was unsuccessful.
      */
     Row getCurrentRow() throws NoSuchElementException;
+
+    // Workaround for ReadRows requests which requires to pass the timeouts in
+    // ApiContext. Can be removed later once it's fixed in Veneer.
+    Duration getAttemptTimeout();
+
+    Duration getOperationTimeout();
   }
 
   /** Returns a {@link Reader} that will read from the specified source. */
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index dad3370dae6..d6208be1bf9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -23,6 +23,7 @@ import com.google.api.core.ApiFuture;
 import com.google.api.gax.batching.Batcher;
 import com.google.api.gax.batching.BatchingException;
 import com.google.api.gax.grpc.GrpcCallContext;
+import com.google.api.gax.retrying.RetrySettings;
 import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.ResponseObserver;
 import com.google.api.gax.rpc.StreamController;
@@ -47,6 +48,8 @@ import com.google.cloud.bigtable.data.v2.models.RowAdapter;
 import com.google.cloud.bigtable.data.v2.models.RowMutation;
 import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
 import com.google.protobuf.ByteString;
+import io.grpc.CallOptions;
+import io.grpc.Deadline;
 import io.grpc.StatusRuntimeException;
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -105,6 +108,9 @@ class BigtableServiceImpl implements BigtableService {
   BigtableServiceImpl(BigtableDataSettings settings) throws IOException {
     this.projectId = settings.getProjectId();
     this.instanceId = settings.getInstanceId();
+    RetrySettings retry = 
settings.getStubSettings().readRowsSettings().getRetrySettings();
+    this.readAttemptTimeout = 
Duration.millis(retry.getInitialRpcTimeout().toMillis());
+    this.readOperationTimeout = 
Duration.millis(retry.getTotalTimeout().toMillis());
     this.client = BigtableDataClient.create(settings);
     LOG.info("Started Bigtable service with settings {}", settings);
   }
@@ -113,6 +119,10 @@ class BigtableServiceImpl implements BigtableService {
   private final String projectId;
   private final String instanceId;
 
+  private final Duration readAttemptTimeout;
+
+  private final Duration readOperationTimeout;
+
   @Override
   public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) {
     return new BigtableWriterImpl(
@@ -135,6 +145,9 @@ class BigtableServiceImpl implements BigtableService {
     private final RowFilter rowFilter;
     private Iterator<Row> results;
 
+    private final Duration attemptTimeout;
+    private final Duration operationTimeout;
+
     private Row currentRow;
 
     @VisibleForTesting
@@ -144,13 +157,18 @@ class BigtableServiceImpl implements BigtableService {
         String instanceId,
         String tableId,
         List<ByteKeyRange> ranges,
-        @Nullable RowFilter rowFilter) {
+        @Nullable RowFilter rowFilter,
+        Duration attemptTimeout,
+        Duration operationTimeout) {
       this.client = client;
       this.projectId = projectId;
       this.instanceId = instanceId;
       this.tableId = tableId;
       this.ranges = ranges;
       this.rowFilter = rowFilter;
+
+      this.attemptTimeout = attemptTimeout;
+      this.operationTimeout = operationTimeout;
     }
 
     @Override
@@ -171,7 +189,7 @@ class BigtableServiceImpl implements BigtableService {
         results =
             client
                 .readRowsCallable(new BigtableRowProtoAdapter())
-                .call(query, GrpcCallContext.createDefault())
+                .call(query, createScanCallContext(attemptTimeout, 
operationTimeout))
                 .iterator();
         serviceCallMetric.call("ok");
       } catch (StatusRuntimeException e) {
@@ -197,6 +215,16 @@ class BigtableServiceImpl implements BigtableService {
       }
       return currentRow;
     }
+
+    @Override
+    public Duration getAttemptTimeout() {
+      return attemptTimeout;
+    }
+
+    @Override
+    public Duration getOperationTimeout() {
+      return operationTimeout;
+    }
   }
 
   @VisibleForTesting
@@ -210,6 +238,8 @@ class BigtableServiceImpl implements BigtableService {
     private final int refillSegmentWaterMark;
     private final long maxSegmentByteSize;
     private ServiceCallMetric serviceCallMetric;
+    private final Duration attemptTimeout;
+    private final Duration operationTimeout;
 
     private static class UpstreamResults {
       private final List<Row> rows;
@@ -228,7 +258,9 @@ class BigtableServiceImpl implements BigtableService {
         String tableId,
         List<ByteKeyRange> ranges,
         @Nullable RowFilter rowFilter,
-        int maxBufferedElementCount) {
+        int maxBufferedElementCount,
+        Duration attemptTimeout,
+        Duration operationTimeout) {
 
       RowSet.Builder rowSetBuilder = RowSet.newBuilder();
       if (ranges.isEmpty()) {
@@ -260,6 +292,8 @@ class BigtableServiceImpl implements BigtableService {
           filter,
           maxBufferedElementCount,
           maxSegmentByteSize,
+          attemptTimeout,
+          operationTimeout,
           createCallMetric(projectId, instanceId, tableId));
     }
 
@@ -273,6 +307,8 @@ class BigtableServiceImpl implements BigtableService {
         @Nullable RowFilter filter,
         int maxRowsInBuffer,
         long maxSegmentByteSize,
+        Duration attemptTimeout,
+        Duration operationTimeout,
         ServiceCallMetric serviceCallMetric) {
       if (rowSet.equals(rowSet.getDefaultInstanceForType())) {
         rowSet = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
@@ -293,6 +329,8 @@ class BigtableServiceImpl implements BigtableService {
       // Asynchronously refill buffer when there is 10% of the elements are 
left
       this.refillSegmentWaterMark =
           Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE));
+      this.attemptTimeout = attemptTimeout;
+      this.operationTimeout = operationTimeout;
     }
 
     @Override
@@ -388,7 +426,7 @@ class BigtableServiceImpl implements BigtableService {
                   future.set(new UpstreamResults(rows, nextNextRequest));
                 }
               },
-              GrpcCallContext.createDefault());
+              createScanCallContext(attemptTimeout, operationTimeout));
       return future;
     }
 
@@ -443,6 +481,16 @@ class BigtableServiceImpl implements BigtableService {
       }
       return currentRow;
     }
+
+    @Override
+    public Duration getAttemptTimeout() {
+      return attemptTimeout;
+    }
+
+    @Override
+    public Duration getOperationTimeout() {
+      return operationTimeout;
+    }
   }
 
   @VisibleForTesting
@@ -612,7 +660,9 @@ class BigtableServiceImpl implements BigtableService {
           source.getTableId().get(),
           source.getRanges(),
           source.getRowFilter(),
-          source.getMaxBufferElementCount());
+          source.getMaxBufferElementCount(),
+          readAttemptTimeout,
+          readOperationTimeout);
     } else {
       return new BigtableReaderImpl(
           client,
@@ -620,10 +670,26 @@ class BigtableServiceImpl implements BigtableService {
           instanceId,
           source.getTableId().get(),
           source.getRanges(),
-          source.getRowFilter());
+          source.getRowFilter(),
+          readAttemptTimeout,
+          readOperationTimeout);
     }
   }
 
+  // - per attempt deadlines - veneer doesn't implement deadlines for 
attempts. To workaround this,
+  //   the timeouts are set per call in the ApiCallContext. However this 
creates a separate issue of
+  //   over running the operation deadline, so gRPC deadline is also set.
+  private static GrpcCallContext createScanCallContext(
+      Duration attemptTimeout, Duration operationTimeout) {
+    GrpcCallContext ctx = GrpcCallContext.createDefault();
+
+    ctx.withCallOptions(
+        CallOptions.DEFAULT.withDeadline(
+            Deadline.after(operationTimeout.getMillis(), 
TimeUnit.MILLISECONDS)));
+    
ctx.withTimeout(org.threeten.bp.Duration.ofMillis(attemptTimeout.getMillis()));
+    return ctx;
+  }
+
   @Override
   public List<KeyOffset> getSampleRowKeys(BigtableSource source) {
     return client.sampleRowKeys(source.getTableId().get());
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index ae960533340..5163e0e4ff2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -1846,6 +1846,16 @@ public class BigtableIOTest {
       }
       return currentRow;
     }
+
+    @Override
+    public Duration getAttemptTimeout() {
+      return Duration.millis(100);
+    }
+
+    @Override
+    public Duration getOperationTimeout() {
+      return Duration.millis(1000);
+    }
   }
 
   /** A {@link FakeBigtableReader} implementation that throw exceptions at 
given stage. */
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
index 98f33ffeb9d..8291c21e7e8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
 import com.google.api.core.ApiFuture;
 import com.google.api.core.SettableApiFuture;
 import com.google.api.gax.batching.Batcher;
+import com.google.api.gax.retrying.RetrySettings;
 import com.google.api.gax.rpc.ApiCallContext;
 import com.google.api.gax.rpc.ResponseObserver;
 import com.google.api.gax.rpc.ServerStream;
@@ -170,6 +171,8 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
+    RetrySettings retrySettings =
+        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableReaderImpl(
             mockBigtableDataClient,
@@ -177,7 +180,9 @@ public class BigtableServiceImplTest {
             bigtableDataSettings.getInstanceId(),
             mockBigtableSource.getTableId().get(),
             mockBigtableSource.getRanges(),
-            null);
+            null,
+            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
+            Duration.millis(retrySettings.getTotalTimeout().toMillis()));
 
     underTest.start();
     Assert.assertEquals(expectedRow, underTest.getCurrentRow());
@@ -225,6 +230,8 @@ public class BigtableServiceImplTest {
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
     
when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
 
+    RetrySettings retrySettings =
+        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -235,6 +242,8 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
+            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
+            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     underTest.start();
@@ -278,6 +287,8 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
+    RetrySettings retrySettings =
+        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -288,6 +299,8 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
+            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
+            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -344,6 +357,8 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
+    RetrySettings retrySettings =
+        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -354,6 +369,8 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
+            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
+            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -411,6 +428,8 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
+    RetrySettings retrySettings =
+        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -421,6 +440,8 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
+            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
+            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -463,6 +484,8 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
+    RetrySettings retrySettings =
+        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -473,6 +496,8 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
+            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
+            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -530,6 +555,8 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
+    RetrySettings retrySettings =
+        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -540,6 +567,8 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
+            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
+            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -595,6 +624,8 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
+    RetrySettings retrySettings =
+        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -605,6 +636,8 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             segmentSize,
             DEFAULT_BYTE_SEGMENT_SIZE,
+            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
+            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     Assert.assertTrue(underTest.start());
@@ -674,6 +707,8 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
+    RetrySettings retrySettings =
+        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -684,6 +719,8 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             segmentByteLimit,
+            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
+            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -736,6 +773,8 @@ public class BigtableServiceImplTest {
         mockStub.createReadRowsCallable(new 
BigtableServiceImpl.BigtableRowProtoAdapter());
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
+    RetrySettings retrySettings =
+        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -746,6 +785,8 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
+            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
+            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     IOException returnedError = null;

Reply via email to