This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5c30b1d6db2 Clean up stale code in BigtableService (#30172)
5c30b1d6db2 is described below
commit 5c30b1d6db27fff299fc28249f9f195d54651abd
Author: Mattie Fu <[email protected]>
AuthorDate: Sun May 26 11:40:49 2024 -0400
Clean up stale code in BigtableService (#30172)
---
.../beam/sdk/io/gcp/bigtable/BigtableService.java | 7 --
.../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 78 ++--------------------
.../beam/sdk/io/gcp/bigtable/BigtableIOTest.java | 10 ---
.../io/gcp/bigtable/BigtableServiceImplTest.java | 43 +-----------
4 files changed, 7 insertions(+), 131 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index 3a3de5622cd..1e3839b5df4 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -30,7 +30,6 @@ import java.util.NoSuchElementException;
import java.util.concurrent.CompletionStage;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.values.KV;
-import org.joda.time.Duration;
/** An interface for real or fake implementations of Cloud Bigtable. */
interface BigtableService extends Serializable {
@@ -76,12 +75,6 @@ interface BigtableService extends Serializable {
* current row because the last such call was unsuccessful.
*/
Row getCurrentRow() throws NoSuchElementException;
-
- // Workaround for ReadRows requests which requires to pass the timeouts in
- // ApiContext. Can be removed later once it's fixed in Veneer.
- Duration getAttemptTimeout();
-
- Duration getOperationTimeout();
}
/** Returns a {@link Reader} that will read from the specified source. */
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index d6208be1bf9..dad3370dae6 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -23,7 +23,6 @@ import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.Batcher;
import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.grpc.GrpcCallContext;
-import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
@@ -48,8 +47,6 @@ import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.protobuf.ByteString;
-import io.grpc.CallOptions;
-import io.grpc.Deadline;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -108,9 +105,6 @@ class BigtableServiceImpl implements BigtableService {
BigtableServiceImpl(BigtableDataSettings settings) throws IOException {
this.projectId = settings.getProjectId();
this.instanceId = settings.getInstanceId();
- RetrySettings retry =
settings.getStubSettings().readRowsSettings().getRetrySettings();
- this.readAttemptTimeout =
Duration.millis(retry.getInitialRpcTimeout().toMillis());
- this.readOperationTimeout =
Duration.millis(retry.getTotalTimeout().toMillis());
this.client = BigtableDataClient.create(settings);
LOG.info("Started Bigtable service with settings {}", settings);
}
@@ -119,10 +113,6 @@ class BigtableServiceImpl implements BigtableService {
private final String projectId;
private final String instanceId;
- private final Duration readAttemptTimeout;
-
- private final Duration readOperationTimeout;
-
@Override
public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) {
return new BigtableWriterImpl(
@@ -145,9 +135,6 @@ class BigtableServiceImpl implements BigtableService {
private final RowFilter rowFilter;
private Iterator<Row> results;
- private final Duration attemptTimeout;
- private final Duration operationTimeout;
-
private Row currentRow;
@VisibleForTesting
@@ -157,18 +144,13 @@ class BigtableServiceImpl implements BigtableService {
String instanceId,
String tableId,
List<ByteKeyRange> ranges,
- @Nullable RowFilter rowFilter,
- Duration attemptTimeout,
- Duration operationTimeout) {
+ @Nullable RowFilter rowFilter) {
this.client = client;
this.projectId = projectId;
this.instanceId = instanceId;
this.tableId = tableId;
this.ranges = ranges;
this.rowFilter = rowFilter;
-
- this.attemptTimeout = attemptTimeout;
- this.operationTimeout = operationTimeout;
}
@Override
@@ -189,7 +171,7 @@ class BigtableServiceImpl implements BigtableService {
results =
client
.readRowsCallable(new BigtableRowProtoAdapter())
- .call(query, createScanCallContext(attemptTimeout,
operationTimeout))
+ .call(query, GrpcCallContext.createDefault())
.iterator();
serviceCallMetric.call("ok");
} catch (StatusRuntimeException e) {
@@ -215,16 +197,6 @@ class BigtableServiceImpl implements BigtableService {
}
return currentRow;
}
-
- @Override
- public Duration getAttemptTimeout() {
- return attemptTimeout;
- }
-
- @Override
- public Duration getOperationTimeout() {
- return operationTimeout;
- }
}
@VisibleForTesting
@@ -238,8 +210,6 @@ class BigtableServiceImpl implements BigtableService {
private final int refillSegmentWaterMark;
private final long maxSegmentByteSize;
private ServiceCallMetric serviceCallMetric;
- private final Duration attemptTimeout;
- private final Duration operationTimeout;
private static class UpstreamResults {
private final List<Row> rows;
@@ -258,9 +228,7 @@ class BigtableServiceImpl implements BigtableService {
String tableId,
List<ByteKeyRange> ranges,
@Nullable RowFilter rowFilter,
- int maxBufferedElementCount,
- Duration attemptTimeout,
- Duration operationTimeout) {
+ int maxBufferedElementCount) {
RowSet.Builder rowSetBuilder = RowSet.newBuilder();
if (ranges.isEmpty()) {
@@ -292,8 +260,6 @@ class BigtableServiceImpl implements BigtableService {
filter,
maxBufferedElementCount,
maxSegmentByteSize,
- attemptTimeout,
- operationTimeout,
createCallMetric(projectId, instanceId, tableId));
}
@@ -307,8 +273,6 @@ class BigtableServiceImpl implements BigtableService {
@Nullable RowFilter filter,
int maxRowsInBuffer,
long maxSegmentByteSize,
- Duration attemptTimeout,
- Duration operationTimeout,
ServiceCallMetric serviceCallMetric) {
if (rowSet.equals(rowSet.getDefaultInstanceForType())) {
rowSet =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
@@ -329,8 +293,6 @@ class BigtableServiceImpl implements BigtableService {
// Asynchronously refill buffer when there is 10% of the elements are
left
this.refillSegmentWaterMark =
Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE));
- this.attemptTimeout = attemptTimeout;
- this.operationTimeout = operationTimeout;
}
@Override
@@ -426,7 +388,7 @@ class BigtableServiceImpl implements BigtableService {
future.set(new UpstreamResults(rows, nextNextRequest));
}
},
- createScanCallContext(attemptTimeout, operationTimeout));
+ GrpcCallContext.createDefault());
return future;
}
@@ -481,16 +443,6 @@ class BigtableServiceImpl implements BigtableService {
}
return currentRow;
}
-
- @Override
- public Duration getAttemptTimeout() {
- return attemptTimeout;
- }
-
- @Override
- public Duration getOperationTimeout() {
- return operationTimeout;
- }
}
@VisibleForTesting
@@ -660,9 +612,7 @@ class BigtableServiceImpl implements BigtableService {
source.getTableId().get(),
source.getRanges(),
source.getRowFilter(),
- source.getMaxBufferElementCount(),
- readAttemptTimeout,
- readOperationTimeout);
+ source.getMaxBufferElementCount());
} else {
return new BigtableReaderImpl(
client,
@@ -670,26 +620,10 @@ class BigtableServiceImpl implements BigtableService {
instanceId,
source.getTableId().get(),
source.getRanges(),
- source.getRowFilter(),
- readAttemptTimeout,
- readOperationTimeout);
+ source.getRowFilter());
}
}
- // - per attempt deadlines - veneer doesn't implement deadlines for
attempts. To workaround this,
- // the timeouts are set per call in the ApiCallContext. However this
creates a separate issue of
- // over running the operation deadline, so gRPC deadline is also set.
- private static GrpcCallContext createScanCallContext(
- Duration attemptTimeout, Duration operationTimeout) {
- GrpcCallContext ctx = GrpcCallContext.createDefault();
-
- ctx.withCallOptions(
- CallOptions.DEFAULT.withDeadline(
- Deadline.after(operationTimeout.getMillis(),
TimeUnit.MILLISECONDS)));
-
ctx.withTimeout(org.threeten.bp.Duration.ofMillis(attemptTimeout.getMillis()));
- return ctx;
- }
-
@Override
public List<KeyOffset> getSampleRowKeys(BigtableSource source) {
return client.sampleRowKeys(source.getTableId().get());
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index bffca865208..5efef321293 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -1846,16 +1846,6 @@ public class BigtableIOTest {
}
return currentRow;
}
-
- @Override
- public Duration getAttemptTimeout() {
- return Duration.millis(100);
- }
-
- @Override
- public Duration getOperationTimeout() {
- return Duration.millis(1000);
- }
}
/** A {@link FakeBigtableReader} implementation that throw exceptions at
given stage. */
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
index 8291c21e7e8..98f33ffeb9d 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -29,7 +29,6 @@ import static org.mockito.Mockito.when;
import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.Batcher;
-import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStream;
@@ -171,8 +170,6 @@ public class BigtableServiceImplTest {
// Set up client to return callable
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
- RetrySettings retrySettings =
-
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableReaderImpl(
mockBigtableDataClient,
@@ -180,9 +177,7 @@ public class BigtableServiceImplTest {
bigtableDataSettings.getInstanceId(),
mockBigtableSource.getTableId().get(),
mockBigtableSource.getRanges(),
- null,
- Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
- Duration.millis(retrySettings.getTotalTimeout().toMillis()));
+ null);
underTest.start();
Assert.assertEquals(expectedRow, underTest.getCurrentRow());
@@ -230,8 +225,6 @@ public class BigtableServiceImplTest {
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));
- RetrySettings retrySettings =
-
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableSegmentReaderImpl(
mockBigtableDataClient,
@@ -242,8 +235,6 @@ public class BigtableServiceImplTest {
RowFilter.getDefaultInstance(),
SEGMENT_SIZE,
DEFAULT_BYTE_SEGMENT_SIZE,
- Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
- Duration.millis(retrySettings.getTotalTimeout().toMillis()),
mockCallMetric);
underTest.start();
@@ -287,8 +278,6 @@ public class BigtableServiceImplTest {
// Set up client to return callable
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
- RetrySettings retrySettings =
-
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableSegmentReaderImpl(
mockBigtableDataClient,
@@ -299,8 +288,6 @@ public class BigtableServiceImplTest {
RowFilter.getDefaultInstance(),
SEGMENT_SIZE,
DEFAULT_BYTE_SEGMENT_SIZE,
- Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
- Duration.millis(retrySettings.getTotalTimeout().toMillis()),
mockCallMetric);
List<Row> actualResults = new ArrayList<>();
@@ -357,8 +344,6 @@ public class BigtableServiceImplTest {
// Set up client to return callable
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
- RetrySettings retrySettings =
-
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableSegmentReaderImpl(
mockBigtableDataClient,
@@ -369,8 +354,6 @@ public class BigtableServiceImplTest {
RowFilter.getDefaultInstance(),
SEGMENT_SIZE,
DEFAULT_BYTE_SEGMENT_SIZE,
- Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
- Duration.millis(retrySettings.getTotalTimeout().toMillis()),
mockCallMetric);
List<Row> actualResults = new ArrayList<>();
@@ -428,8 +411,6 @@ public class BigtableServiceImplTest {
// Set up client to return callable
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
- RetrySettings retrySettings =
-
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableSegmentReaderImpl(
mockBigtableDataClient,
@@ -440,8 +421,6 @@ public class BigtableServiceImplTest {
RowFilter.getDefaultInstance(),
SEGMENT_SIZE,
DEFAULT_BYTE_SEGMENT_SIZE,
- Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
- Duration.millis(retrySettings.getTotalTimeout().toMillis()),
mockCallMetric);
List<Row> actualResults = new ArrayList<>();
@@ -484,8 +463,6 @@ public class BigtableServiceImplTest {
// Set up client to return callable
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
- RetrySettings retrySettings =
-
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableSegmentReaderImpl(
mockBigtableDataClient,
@@ -496,8 +473,6 @@ public class BigtableServiceImplTest {
RowFilter.getDefaultInstance(),
SEGMENT_SIZE,
DEFAULT_BYTE_SEGMENT_SIZE,
- Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
- Duration.millis(retrySettings.getTotalTimeout().toMillis()),
mockCallMetric);
List<Row> actualResults = new ArrayList<>();
@@ -555,8 +530,6 @@ public class BigtableServiceImplTest {
// Set up client to return callable
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
- RetrySettings retrySettings =
-
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableSegmentReaderImpl(
mockBigtableDataClient,
@@ -567,8 +540,6 @@ public class BigtableServiceImplTest {
RowFilter.getDefaultInstance(),
SEGMENT_SIZE,
DEFAULT_BYTE_SEGMENT_SIZE,
- Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
- Duration.millis(retrySettings.getTotalTimeout().toMillis()),
mockCallMetric);
List<Row> actualResults = new ArrayList<>();
@@ -624,8 +595,6 @@ public class BigtableServiceImplTest {
// Set up client to return callable
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
- RetrySettings retrySettings =
-
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableSegmentReaderImpl(
mockBigtableDataClient,
@@ -636,8 +605,6 @@ public class BigtableServiceImplTest {
RowFilter.getDefaultInstance(),
segmentSize,
DEFAULT_BYTE_SEGMENT_SIZE,
- Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
- Duration.millis(retrySettings.getTotalTimeout().toMillis()),
mockCallMetric);
Assert.assertTrue(underTest.start());
@@ -707,8 +674,6 @@ public class BigtableServiceImplTest {
// Set up client to return callable
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
- RetrySettings retrySettings =
-
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableSegmentReaderImpl(
mockBigtableDataClient,
@@ -719,8 +684,6 @@ public class BigtableServiceImplTest {
RowFilter.getDefaultInstance(),
SEGMENT_SIZE,
segmentByteLimit,
- Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
- Duration.millis(retrySettings.getTotalTimeout().toMillis()),
mockCallMetric);
List<Row> actualResults = new ArrayList<>();
@@ -773,8 +736,6 @@ public class BigtableServiceImplTest {
mockStub.createReadRowsCallable(new
BigtableServiceImpl.BigtableRowProtoAdapter());
// Set up client to return callable
when(mockBigtableDataClient.readRowsCallable(any(RowAdapter.class))).thenReturn(callable);
- RetrySettings retrySettings =
-
bigtableDataSettings.getStubSettings().bulkReadRowsSettings().getRetrySettings();
BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableSegmentReaderImpl(
mockBigtableDataClient,
@@ -785,8 +746,6 @@ public class BigtableServiceImplTest {
RowFilter.getDefaultInstance(),
SEGMENT_SIZE,
DEFAULT_BYTE_SEGMENT_SIZE,
- Duration.millis(retrySettings.getInitialRpcTimeout().toMillis()),
- Duration.millis(retrySettings.getTotalTimeout().toMillis()),
mockCallMetric);
IOException returnedError = null;