This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c30b1d6db2 Clean up stale code in BigtableService (#30172)
5c30b1d6db2 is described below

commit 5c30b1d6db27fff299fc28249f9f195d54651abd
Author: Mattie Fu <[email protected]>
AuthorDate: Sun May 26 11:40:49 2024 -0400

    Clean up stale code in BigtableService (#30172)
---
 .../beam/sdk/io/gcp/bigtable/BigtableService.java  |  7 --
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   | 78 ++--------------------
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java   | 10 ---
 .../io/gcp/bigtable/BigtableServiceImplTest.java   | 43 +-----------
 4 files changed, 7 insertions(+), 131 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index 3a3de5622cd..1e3839b5df4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -30,7 +30,6 @@ import java.util.NoSuchElementException;
 import java.util.concurrent.CompletionStage;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
 import org.apache.beam.sdk.values.KV;
-import org.joda.time.Duration;
 
 /** An interface for real or fake implementations of Cloud Bigtable. */
 interface BigtableService extends Serializable {
@@ -76,12 +75,6 @@ interface BigtableService extends Serializable {
      * current row because the last such call was unsuccessful.
      */
     Row getCurrentRow() throws NoSuchElementException;
-
-    // Workaround for ReadRows requests which requires to pass the timeouts in
-    // ApiContext. Can be removed later once it's fixed in Veneer.
-    Duration getAttemptTimeout();
-
-    Duration getOperationTimeout();
   }
 
   /** Returns a {@link Reader} that will read from the specified source. */
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index d6208be1bf9..dad3370dae6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -23,7 +23,6 @@ import com.google.api.core.ApiFuture;
 import com.google.api.gax.batching.Batcher;
 import com.google.api.gax.batching.BatchingException;
 import com.google.api.gax.grpc.GrpcCallContext;
-import com.google.api.gax.retrying.RetrySettings;
 import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.ResponseObserver;
 import com.google.api.gax.rpc.StreamController;
@@ -48,8 +47,6 @@ import com.google.cloud.bigtable.data.v2.models.RowAdapter;
 import com.google.cloud.bigtable.data.v2.models.RowMutation;
 import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
 import com.google.protobuf.ByteString;
-import io.grpc.CallOptions;
-import io.grpc.Deadline;
 import io.grpc.StatusRuntimeException;
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -108,9 +105,6 @@ class BigtableServiceImpl implements BigtableService {
   BigtableServiceImpl(BigtableDataSettings settings) throws IOException {
     this.projectId = settings.getProjectId();
     this.instanceId = settings.getInstanceId();
-    RetrySettings retry = 
settings.getStubSettings().readRowsSettings().getRetrySettings();
-    this.readAttemptTimeout = 
Duration.millis(retry.getInitialRpcTimeout().toMillis());
-    this.readOperationTimeout = 
Duration.millis(retry.getTotalTimeout().toMillis());
     this.client = BigtableDataClient.create(settings);
     LOG.info("Started Bigtable service with settings {}", settings);
   }
@@ -119,10 +113,6 @@ class BigtableServiceImpl implements BigtableService {
   private final String projectId;
   private final String instanceId;
 
-  private final Duration readAttemptTimeout;
-
-  private final Duration readOperationTimeout;
-
   @Override
   public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) {
     return new BigtableWriterImpl(
@@ -145,9 +135,6 @@ class BigtableServiceImpl implements BigtableService {
     private final RowFilter rowFilter;
     private Iterator<Row> results;
 
-    private final Duration attemptTimeout;
-    private final Duration operationTimeout;
-
     private Row currentRow;
 
     @VisibleForTesting
@@ -157,18 +144,13 @@ class BigtableServiceImpl implements BigtableService {
         String instanceId,
         String tableId,
         List<ByteKeyRange> ranges,
-        @Nullable RowFilter rowFilter,
-        Duration attemptTimeout,
-        Duration operationTimeout) {
+        @Nullable RowFilter rowFilter) {
       this.client = client;
       this.projectId = projectId;
       this.instanceId = instanceId;
       this.tableId = tableId;
       this.ranges = ranges;
       this.rowFilter = rowFilter;
-
-      this.attemptTimeout = attemptTimeout;
-      this.operationTimeout = operationTimeout;
     }
 
     @Override
@@ -189,7 +171,7 @@ class BigtableServiceImpl implements BigtableService {
         results =
             client
                 .readRowsCallable(new BigtableRowProtoAdapter())
-                .call(query, createScanCallContext(attemptTimeout, 
operationTimeout))
+                .call(query, GrpcCallContext.createDefault())
                 .iterator();
         serviceCallMetric.call("ok");
       } catch (StatusRuntimeException e) {
@@ -215,16 +197,6 @@ class BigtableServiceImpl implements BigtableService {
       }
       return currentRow;
     }
-
-    @Override
-    public Duration getAttemptTimeout() {
-      return attemptTimeout;
-    }
-
-    @Override
-    public Duration getOperationTimeout() {
-      return operationTimeout;
-    }
   }
 
   @VisibleForTesting
@@ -238,8 +210,6 @@ class BigtableServiceImpl implements BigtableService {
     private final int refillSegmentWaterMark;
     private final long maxSegmentByteSize;
     private ServiceCallMetric serviceCallMetric;
-    private final Duration attemptTimeout;
-    private final Duration operationTimeout;
 
     private static class UpstreamResults {
       private final List<Row> rows;
@@ -258,9 +228,7 @@ class BigtableServiceImpl implements BigtableService {
         String tableId,
         List<ByteKeyRange> ranges,
         @Nullable RowFilter rowFilter,
-        int maxBufferedElementCount,
-        Duration attemptTimeout,
-        Duration operationTimeout) {
+        int maxBufferedElementCount) {
 
       RowSet.Builder rowSetBuilder = RowSet.newBuilder();
       if (ranges.isEmpty()) {
@@ -292,8 +260,6 @@ class BigtableServiceImpl implements BigtableService {
           filter,
           maxBufferedElementCount,
           maxSegmentByteSize,
-          attemptTimeout,
-          operationTimeout,
           createCallMetric(projectId, instanceId, tableId));
     }
 
@@ -307,8 +273,6 @@ class BigtableServiceImpl implements BigtableService {
         @Nullable RowFilter filter,
         int maxRowsInBuffer,
         long maxSegmentByteSize,
-        Duration attemptTimeout,
-        Duration operationTimeout,
         ServiceCallMetric serviceCallMetric) {
       if (rowSet.equals(rowSet.getDefaultInstanceForType())) {
         rowSet = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
@@ -329,8 +293,6 @@ class BigtableServiceImpl implements BigtableService {
       // Asynchronously refill buffer when there is 10% of the elements are 
left
       this.refillSegmentWaterMark =
           Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE));
-      this.attemptTimeout = attemptTimeout;
-      this.operationTimeout = operationTimeout;
     }
 
     @Override
@@ -426,7 +388,7 @@ class BigtableServiceImpl implements BigtableService {
                   future.set(new UpstreamResults(rows, nextNextRequest));
                 }
               },
-              createScanCallContext(attemptTimeout, operationTimeout));
+              GrpcCallContext.createDefault());
       return future;
     }
 
@@ -481,16 +443,6 @@ class BigtableServiceImpl implements BigtableService {
       }
       return currentRow;
     }
-
-    @Override
-    public Duration getAttemptTimeout() {
-      return attemptTimeout;
-    }
-
-    @Override
-    public Duration getOperationTimeout() {
-      return operationTimeout;
-    }
   }
 
   @VisibleForTesting
@@ -660,9 +612,7 @@ class BigtableServiceImpl implements BigtableService {
           source.getTableId().get(),
           source.getRanges(),
           source.getRowFilter(),
-          source.getMaxBufferElementCount(),
-          readAttemptTimeout,
-          readOperationTimeout);
+          source.getMaxBufferElementCount());
     } else {
       return new BigtableReaderImpl(
           client,
@@ -670,26 +620,10 @@ class BigtableServiceImpl implements BigtableService {
           instanceId,
           source.getTableId().get(),
           source.getRanges(),
-          source.getRowFilter(),
-          readAttemptTimeout,
-          readOperationTimeout);
+          source.getRowFilter());
     }
   }
 
-  // - per attempt deadlines - veneer doesn't implement deadlines for 
attempts. To workaround this,
-  //   the timeouts are set per call in the ApiCallContext. However this 
creates a separate issue of
-  //   over running the operation deadline, so gRPC deadline is also set.
-  private static GrpcCallContext createScanCallContext(
-      Duration attemptTimeout, Duration operationTimeout) {
-    GrpcCallContext ctx = GrpcCallContext.createDefault();
-
-    ctx.withCallOptions(
-        CallOptions.DEFAULT.withDeadline(
-            Deadline.after(operationTimeout.getMillis(), 
TimeUnit.MILLISECONDS)));
-    
ctx.withTimeout(org.threeten.bp.Duration.ofMillis(attemptTimeout.getMillis()));
-    return ctx;
-  }
-
   @Override
   public List<KeyOffset> getSampleRowKeys(BigtableSource source) {
     return client.sampleRowKeys(source.getTableId().get());
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index bffca865208..5efef321293 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -1846,16 +1846,6 @@ public class BigtableIOTest {
       }
       return currentRow;
     }
-
-    @Override
-    public Duration getAttemptTimeout() {
-      return Duration.millis(100);
-    }
-
-    @Override
-    public Duration getOperationTimeout() {
-      return Duration.millis(1000);
-    }
   }
 
   /** A {@link FakeBigtableReader} implementation that throw exceptions at 
given stage. */
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
index 8291c21e7e8..98f33ffeb9d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -29,7 +29,6 @@ import static org.mockito.Mockito.when;
 import com.google.api.core.ApiFuture;
 import com.google.api.core.SettableApiFuture;
 import com.google.api.gax.batching.Batcher;
-import com.google.api.gax.retrying.RetrySettings;
 import com.google.api.gax.rpc.ApiCallContext;
 import com.google.api.gax.rpc.ResponseObserver;
 import com.google.api.gax.rpc.ServerStream;
@@ -171,8 +170,6 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
-    RetrySettings retrySettings =
-        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableReaderImpl(
             mockBigtableDataClient,
@@ -180,9 +177,7 @@ public class BigtableServiceImplTest {
             bigtableDataSettings.getInstanceId(),
             mockBigtableSource.getTableId().get(),
             mockBigtableSource.getRanges(),
-            null,
-            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
-            Duration.millis(retrySettings.getTotalTimeout().toMillis()));
+            null);
 
     underTest.start();
     Assert.assertEquals(expectedRow, underTest.getCurrentRow());
@@ -230,8 +225,6 @@ public class BigtableServiceImplTest {
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
     
when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
 
-    RetrySettings retrySettings =
-        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -242,8 +235,6 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
-            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
-            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     underTest.start();
@@ -287,8 +278,6 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
-    RetrySettings retrySettings =
-        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -299,8 +288,6 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
-            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
-            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -357,8 +344,6 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
-    RetrySettings retrySettings =
-        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -369,8 +354,6 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
-            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
-            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -428,8 +411,6 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
-    RetrySettings retrySettings =
-        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -440,8 +421,6 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
-            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
-            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -484,8 +463,6 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
-    RetrySettings retrySettings =
-        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -496,8 +473,6 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
-            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
-            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -555,8 +530,6 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
-    RetrySettings retrySettings =
-        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -567,8 +540,6 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
-            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
-            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -624,8 +595,6 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
-    RetrySettings retrySettings =
-        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -636,8 +605,6 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             segmentSize,
             DEFAULT_BYTE_SEGMENT_SIZE,
-            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
-            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     Assert.assertTrue(underTest.start());
@@ -707,8 +674,6 @@ public class BigtableServiceImplTest {
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
 
-    RetrySettings retrySettings =
-        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -719,8 +684,6 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             segmentByteLimit,
-            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
-            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     List<Row> actualResults = new ArrayList<>();
@@ -773,8 +736,6 @@ public class BigtableServiceImplTest {
         mockStub.createReadRowsCallable(new 
BigtableServiceImpl.BigtableRowProtoAdapter());
     // Set up client to return callable
     
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
-    RetrySettings retrySettings =
-        
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
     BigtableService.Reader underTest =
         new BigtableServiceImpl.BigtableSegmentReaderImpl(
             mockBigtableDataClient,
@@ -785,8 +746,6 @@ public class BigtableServiceImplTest {
             RowFilter.getDefaultInstance(),
             SEGMENT_SIZE,
             DEFAULT_BYTE_SEGMENT_SIZE,
-            Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
-            Duration.millis(retrySettings.getTotalTimeout().toMillis()),
             mockCallMetric);
 
     IOException returnedError = null;

Reply via email to