diegomez17 commented on code in PR #24015:
URL: https://github.com/apache/beam/pull/24015#discussion_r1043684951
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -174,32 +198,25 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
- currentRow = results.next();
- return currentRow != null;
+ if (results.hasNext()) {
+ currentRow = results.next();
+ return true;
+ }
+ return false;
}
@Override
public void close() throws IOException {
// Goal: by the end of this function, both results and session are null
and closed,
Review Comment:
nit: please change this comment to say client instead of session
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -314,57 +344,61 @@ public boolean advance() throws IOException {
}
private Future<UpstreamResults> fetchNextSegment() {
- SettableFuture<UpstreamResults> f = SettableFuture.create();
+ SettableFuture<UpstreamResults> future = SettableFuture.create();
// When the nextRequest is null, the last fill completed and the buffer
contains the last rows
if (nextRequest == null) {
- f.set(new UpstreamResults(ImmutableList.of(), null));
- return f;
+ future.set(new UpstreamResults(ImmutableList.of(), null));
+ return future;
}
- // TODO(diegomez): Remove atomic ScanHandler for simpler
StreamObserver/Future implementation
- AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
- ScanHandler handler =
- session
- .getDataClient()
- .readFlatRows(
- nextRequest,
- new StreamObserver<FlatRow>() {
- List<Row> rows = new ArrayList<>();
- long currentByteSize = 0;
- boolean byteLimitReached = false;
-
- @Override
- public void onNext(FlatRow flatRow) {
- Row row = FlatRowConverter.convert(flatRow);
- currentByteSize += row.getSerializedSize();
- rows.add(row);
-
- if (currentByteSize > maxSegmentByteSize) {
- byteLimitReached = true;
- atomicScanHandler.get().cancel();
- return;
- }
- }
-
- @Override
- public void onError(Throwable e) {
- f.setException(e);
- }
-
- @Override
- public void onCompleted() {
- ReadRowsRequest nextNextRequest = null;
-
- // When requested rows < limit, the current request will
be the last
- if (byteLimitReached || rows.size() ==
nextRequest.getRowsLimit()) {
- nextNextRequest =
- truncateRequest(nextRequest, rows.get(rows.size()
- 1).getKey());
- }
- f.set(new UpstreamResults(rows, nextNextRequest));
- }
- });
- atomicScanHandler.set(handler);
- return f;
+ client
+ .readRowsCallable(new BeamRowAdapter())
+ .call(
+ Query.fromProto(nextRequest),
+ new ResponseObserver<com.google.bigtable.v2.Row>() {
Review Comment:
Is there a naming conflict between Row and com.google.bigtable.v2.Row? If
possible, remove the directory of Row.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -102,19 +119,18 @@ public BigtableOptions getBigtableOptions() {
@Override
public BigtableWriterImpl openForWriting(String tableId) throws IOException {
- BigtableSession session = new BigtableSession(options);
- BigtableTableName tableName =
options.getInstanceName().toTableName(tableId);
- return new BigtableWriterImpl(session, tableName);
+ LOG.info("Opening for writing with settings " +
veneeringSettings.getDataSettings().toString());
Review Comment:
Please move veneerSettings.getDataSettings() to a variable so it's easier to
read.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -535,43 +578,84 @@ public String toString() {
@Override
public Reader createReader(BigtableSource source) throws IOException {
- BigtableSession session = new BigtableSession(options);
+ LOG.info(
+ "Creating a Reader for Bigtable with settings: " +
veneeringSettings.getDataSettings());
+ BigtableDataClient client =
BigtableDataClient.create(veneeringSettings.getDataSettings());
if (source.getMaxBufferElementCount() != null) {
- return BigtableSegmentReaderImpl.create(session, source);
+ return BigtableSegmentReaderImpl.create(
+ client,
+ veneeringSettings.getDataSettings(),
+ source,
+ veneeringSettings.getOperationTimeouts());
} else {
- return new BigtableReaderImpl(session, source);
+ return new BigtableReaderImpl(
+ client,
+ veneeringSettings.getDataSettings(),
+ source,
+ veneeringSettings.getOperationTimeouts());
}
}
+ // Support 2 bigtable-hbase features not directly available in veneer:
+ // - disabling timeouts - when timeouts are disabled, bigtable-hbase ignores
user configured
+ // timeouts and forces 6 minute deadlines per attempt for all RPCs except
scans. This is
+ // implemented by an interceptor. However the interceptor must be informed
that this is a scan
+ // - per attempt deadlines - vener doesn't implement deadlines for attempts.
To workaround this,
Review Comment:
nit: veneer
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -97,31 +103,37 @@ public class BigtableServiceImplTest {
private static final long DEFAULT_BYTE_SEGMENT_SIZE = 1024 * 1024 * 1000;
private static final String DEFAULT_PREFIX = "b";
- private static final BigtableTableName TABLE_NAME =
- new BigtableInstanceName(PROJECT_ID, INSTANCE_ID).toTableName(TABLE_ID);
+ private static RequestContext requestContext =
+ RequestContext.create(PROJECT_ID, INSTANCE_ID, "default");
- @Mock private BigtableSession mockSession;
-
- @Mock private BulkMutation mockBulkMutation;
+ @Mock private Batcher<RowMutationEntry, Void> mockBatcher;
@Mock private BigtableDataClient mockBigtableDataClient;
- @Mock private BigtableSource mockBigtableSource;
+ @Mock private EnhancedBigtableStub mockStub;
- @Mock private ScanHandler scanHandler;
+ private BigtableDataSettings bigtableDataSettings;
+
+ @Mock private BigtableSource mockBigtableSource;
@Mock private ServiceCallMetric mockCallMetric;
- @Captor private ArgumentCaptor<ReadRowsRequest> requestCaptor;
+ @Captor private ArgumentCaptor<Query> requestCaptor;
Review Comment:
nit: queryCaptor to better represent the specific class it is capturing.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -464,56 +502,61 @@ public void close() throws IOException {
bulkMutation = null;
}
} finally {
- if (session != null) {
- session.close();
- session = null;
+ if (client != null) {
+ client.close();
+ client = null;
}
}
}
@Override
public CompletionStage<MutateRowResponse> writeRecord(KV<ByteString,
Iterable<Mutation>> record)
throws IOException {
- MutateRowsRequest.Entry request =
- MutateRowsRequest.Entry.newBuilder()
- .setRowKey(record.getKey())
- .addAllMutations(record.getValue())
- .build();
+ com.google.cloud.bigtable.data.v2.models.Mutation mutation =
+
com.google.cloud.bigtable.data.v2.models.Mutation.fromProtoUnsafe(record.getValue());
+
+ RowMutationEntry entry =
RowMutationEntry.createFromMutationUnsafe(record.getKey(), mutation);
+
+ // Populate metrics
HashMap<String, String> baseLabels = new HashMap<>();
baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.MutateRows");
baseLabels.put(
MonitoringInfoConstants.Labels.RESOURCE,
GcpResourceIdentifiers.bigtableResource(
- session.getOptions().getProjectId(),
- session.getOptions().getInstanceId(),
- tableName.getTableId()));
- baseLabels.put(
- MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
session.getOptions().getProjectId());
- baseLabels.put(
- MonitoringInfoConstants.Labels.INSTANCE_ID,
session.getOptions().getInstanceId());
+ settings.getProjectId(), settings.getInstanceId(), tableId));
+ baseLabels.put(MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID,
settings.getProjectId());
+ baseLabels.put(MonitoringInfoConstants.Labels.INSTANCE_ID,
settings.getInstanceId());
baseLabels.put(
MonitoringInfoConstants.Labels.TABLE_ID,
GcpResourceIdentifiers.bigtableTableID(
- session.getOptions().getProjectId(),
- session.getOptions().getInstanceId(),
- tableName.getTableId()));
+ settings.getProjectId(), settings.getInstanceId(), tableId));
ServiceCallMetric serviceCallMetric =
new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
+
Futures.addCallback(
- new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),
+ new VendoredListenableFutureAdapter<>(bulkMutation.add(entry)),
new FutureCallback<MutateRowResponse>() {
@Override
public void onSuccess(MutateRowResponse mutateRowResponse) {
+ // TODO throttling logic to update dataflow counter
Review Comment:
Since we aren't fully committed to implementing this counter, could we leave
out these comments and revisit it during the next quarter?
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java:
##########
@@ -1781,7 +1780,7 @@ private boolean rangesContainsKey(List<ByteKeyRange>
ranges, ByteKey key) {
}
@Override
- public Row getCurrentRow() {
+ public com.google.bigtable.v2.Row getCurrentRow() {
Review Comment:
Does this have to be changed?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -535,43 +578,84 @@ public String toString() {
@Override
public Reader createReader(BigtableSource source) throws IOException {
- BigtableSession session = new BigtableSession(options);
+ LOG.info(
+ "Creating a Reader for Bigtable with settings: " +
veneeringSettings.getDataSettings());
+ BigtableDataClient client =
BigtableDataClient.create(veneeringSettings.getDataSettings());
if (source.getMaxBufferElementCount() != null) {
- return BigtableSegmentReaderImpl.create(session, source);
+ return BigtableSegmentReaderImpl.create(
+ client,
+ veneeringSettings.getDataSettings(),
+ source,
+ veneeringSettings.getOperationTimeouts());
} else {
- return new BigtableReaderImpl(session, source);
+ return new BigtableReaderImpl(
+ client,
+ veneeringSettings.getDataSettings(),
+ source,
+ veneeringSettings.getOperationTimeouts());
}
}
+ // Support 2 bigtable-hbase features not directly available in veneer:
+ // - disabling timeouts - when timeouts are disabled, bigtable-hbase ignores
user configured
+ // timeouts and forces 6 minute deadlines per attempt for all RPCs except
scans. This is
+ // implemented by an interceptor. However the interceptor must be informed
that this is a scan
+ // - per attempt deadlines - vener doesn't implement deadlines for attempts.
To workaround this,
+ // the timeouts are set per call in the ApiCallContext. However this
creates a separate issue of
+ // over running the operation deadline, so gRPC deadline is also set.
+ private static GrpcCallContext createScanCallContext(
+ BigtableIOOperationTimeouts operationTimeouts) {
+ GrpcCallContext ctx = GrpcCallContext.createDefault();
+
+ if (!operationTimeouts.getUseTimeouts()) {
+ ctx =
+ ctx.withCallOptions(
+ CallOptions.DEFAULT.withOption(
+
BigtableHBaseVeneeringSettings.NoTimeoutsInterceptor.SKIP_DEFAULT_ATTEMPT_TIMEOUT,
+ true));
+ } else {
+ if
(operationTimeouts.getBulkReadRowsTimeouts().getOperationTimeout().isPresent())
{
+ ctx.withCallOptions(
Review Comment:
Please set the variable to the return value from callOptions. i.e. ctx =
ctx.withCallOptions
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -174,32 +198,25 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
- currentRow = results.next();
- return currentRow != null;
+ if (results.hasNext()) {
Review Comment:
I'm having trouble understanding why this needed to be changed, please just
make sure that there is no places outside of the tests that expect currentRow
to reach null.
Based on what I briefly look at, this should be fine but just want to make
sure.
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java:
##########
@@ -1657,26 +1657,25 @@ void setupSampleRowKeys(String tableId, int numSamples,
long bytesPerRow) {
checkArgument(numSamples > 0, "Number of samples must be positive: %s",
numSamples);
checkArgument(bytesPerRow > 0, "Bytes/Row must be positive: %s",
bytesPerRow);
- ImmutableList.Builder<SampleRowKeysResponse> ret =
ImmutableList.builder();
+ ImmutableList.Builder<KeyOffset> ret = ImmutableList.builder();
SortedMap<ByteString, ByteString> rows = getTable(tableId);
+ System.out.println("num of rows = " + rows.size());
Review Comment:
nit: Remove prints for LOG.info() if you want to keep this
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java:
##########
@@ -1657,26 +1657,25 @@ void setupSampleRowKeys(String tableId, int numSamples,
long bytesPerRow) {
checkArgument(numSamples > 0, "Number of samples must be positive: %s",
numSamples);
checkArgument(bytesPerRow > 0, "Bytes/Row must be positive: %s",
bytesPerRow);
- ImmutableList.Builder<SampleRowKeysResponse> ret =
ImmutableList.builder();
+ ImmutableList.Builder<KeyOffset> ret = ImmutableList.builder();
SortedMap<ByteString, ByteString> rows = getTable(tableId);
+ System.out.println("num of rows = " + rows.size());
int currentSample = 1;
int rowsSoFar = 0;
for (Map.Entry<ByteString, ByteString> entry : rows.entrySet()) {
if (((double) rowsSoFar) / rows.size() >= ((double) currentSample) /
numSamples) {
// add the sample with the total number of bytes in the table before
this key.
- ret.add(
- SampleRowKeysResponse.newBuilder()
- .setRowKey(entry.getKey())
- .setOffsetBytes(rowsSoFar * bytesPerRow)
- .build());
+ ret.add(KeyOffset.create(entry.getKey(), rowsSoFar * bytesPerRow));
// Move on to next sample
currentSample++;
}
++rowsSoFar;
}
- // Add the last sample indicating the end of the table, with all rows
before it.
- ret.add(SampleRowKeysResponse.newBuilder().setOffsetBytes(rows.size() *
bytesPerRow).build());
+ System.out.println("ret size=" + ret.build().size());
Review Comment:
nit: Remove prints for LOG.info() if you want to keep this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]