igorbernstein2 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r873883670
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +209,216 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+
+ private @Nullable ReadRowsRequest nextRequest;
+ private @Nullable Row currentRow;
+ private final Queue<Row> buffer;
+ private final long refillSegmentWaterMark;
+ private final long maxSegmentByteSize;
+ private Future<UpstreamResults> future;
+ private ServiceCallMetric serviceCallMetric;
+
+ private static class UpstreamResults {
+ private final List<Row> rows;
+ private final @Nullable ReadRowsRequest nextRequest;
+
+ private UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest
nextRequest) {
+ this.rows = rows;
+ this.nextRequest = nextRequest;
+ }
+ }
+
+ static BigtableSegmentReaderImpl create(BigtableSession session,
BigtableSource source) {
+ RowSet.Builder rowSetBuilder = RowSet.newBuilder();
+ if (source.getRanges().isEmpty()) {
+ rowSetBuilder =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance());
+ } else {
+ for (ByteKeyRange beamRange : source.getRanges()) {
+ RowRange.Builder rangeBuilder = rowSetBuilder.addRowRangesBuilder();
+ rangeBuilder
+
.setStartKeyClosed(ByteString.copyFrom(beamRange.getStartKey().getValue()))
+
.setEndKeyOpen(ByteString.copyFrom(beamRange.getEndKey().getValue()));
+ }
+ }
+
+ RowFilter filter =
+ MoreObjects.firstNonNull(source.getRowFilter(),
RowFilter.getDefaultInstance());
+ ReadRowsRequest request =
+ ReadRowsRequest.newBuilder()
+ .setTableName(
+
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+ .setRows(rowSetBuilder.build())
+ .setFilter(filter)
+ .setRowsLimit(source.getMaxBufferElementCount())
+ .build();
+
+ long maxSegmentByteSize =
+ (long) (Runtime.getRuntime().totalMemory() *
DEFAULT_BYTE_LIMIT_PERCENTAGE);
+
+ return new BigtableSegmentReaderImpl(
+ session,
+ request,
+ maxSegmentByteSize,
+ populateReaderCallMetric(session, source.getTableId().get()));
+ }
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(
+ BigtableSession session,
+ ReadRowsRequest request,
+ long maxSegmentByteSize,
+ ServiceCallMetric serviceCallMetric) {
+ this.session = session;
+ this.nextRequest = request;
+ this.maxSegmentByteSize = maxSegmentByteSize;
+ this.serviceCallMetric = serviceCallMetric;
+ this.buffer = new ArrayDeque<>();
+ // Asynchronously refill buffer when there is 10% of the elements are
left
+ this.refillSegmentWaterMark = request.getRowsLimit() / 10;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ future = fetchNextSegment();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.size() < refillSegmentWaterMark && future == null) {
+ future = fetchNextSegment();
+ }
+ if (buffer.isEmpty() && future != null) {
+ waitReadRowsFuture();
+ }
+ currentRow = buffer.poll();
+ return currentRow != null;
+ }
+
+ private Future<UpstreamResults> fetchNextSegment() {
+ SettableFuture<UpstreamResults> f = SettableFuture.create();
+ // When the nextRequest is null, the last fill completed and the buffer
contains the last rows
+ if (nextRequest == null) {
+ f.set(new UpstreamResults(ImmutableList.of(), null));
+ return f;
+ }
+
+ // TODO(diegomez): Remove atomic ScanHandler for simpler
StreamObserver/Future implementation
+ AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
+ ScanHandler handler =
+ session
+ .getDataClient()
+ .readFlatRows(
+ nextRequest,
+ new StreamObserver<FlatRow>() {
+ List<Row> rows = new ArrayList<>();
+ long currentByteSize = 0;
+ boolean byteLimitReached = false;
+
+ @Override
+ public void onNext(FlatRow flatRow) {
+ Row row = FlatRowConverter.convert(flatRow);
+ currentByteSize += row.getSerializedSize();
+ rows.add(row);
+
+ if (currentByteSize > maxSegmentByteSize) {
+ byteLimitReached = true;
+ atomicScanHandler.get().cancel();
+ return;
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ f.setException(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ ReadRowsRequest nextNextRequest = null;
+
+ // When requested rows < limit, the current request will
be the last
+ if (byteLimitReached || rows.size() ==
nextRequest.getRowsLimit()) {
+ nextNextRequest =
+ truncateRequest(nextRequest, rows.get(rows.size()
- 1).getKey());
+ }
+ f.set(new UpstreamResults(rows, nextNextRequest));
+ }
+ });
+ atomicScanHandler.set(handler);
+ return f;
+ }
+
+ private void waitReadRowsFuture() throws IOException {
+ try {
+ UpstreamResults r = future.get();
+ buffer.addAll(r.rows);
+ nextRequest = r.nextRequest;
+ future = null;
+ serviceCallMetric.call("ok");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof StatusRuntimeException) {
+ System.out.println(cause.hashCode());
Review Comment:
please remove all printlns
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +209,216 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+
+ private @Nullable ReadRowsRequest nextRequest;
+ private @Nullable Row currentRow;
+ private final Queue<Row> buffer;
+ private final long refillSegmentWaterMark;
+ private final long maxSegmentByteSize;
+ private Future<UpstreamResults> future;
+ private ServiceCallMetric serviceCallMetric;
+
+ private static class UpstreamResults {
+ private final List<Row> rows;
+ private final @Nullable ReadRowsRequest nextRequest;
+
+ private UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest
nextRequest) {
+ this.rows = rows;
+ this.nextRequest = nextRequest;
+ }
+ }
+
+ static BigtableSegmentReaderImpl create(BigtableSession session,
BigtableSource source) {
+ RowSet.Builder rowSetBuilder = RowSet.newBuilder();
+ if (source.getRanges().isEmpty()) {
+ rowSetBuilder =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance());
+ } else {
+ for (ByteKeyRange beamRange : source.getRanges()) {
+ RowRange.Builder rangeBuilder = rowSetBuilder.addRowRangesBuilder();
+ rangeBuilder
+
.setStartKeyClosed(ByteString.copyFrom(beamRange.getStartKey().getValue()))
+
.setEndKeyOpen(ByteString.copyFrom(beamRange.getEndKey().getValue()));
+ }
+ }
+
+ RowFilter filter =
+ MoreObjects.firstNonNull(source.getRowFilter(),
RowFilter.getDefaultInstance());
+ ReadRowsRequest request =
+ ReadRowsRequest.newBuilder()
+ .setTableName(
+
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+ .setRows(rowSetBuilder.build())
+ .setFilter(filter)
+ .setRowsLimit(source.getMaxBufferElementCount())
+ .build();
+
+ long maxSegmentByteSize =
+ (long) (Runtime.getRuntime().totalMemory() *
DEFAULT_BYTE_LIMIT_PERCENTAGE);
+
+ return new BigtableSegmentReaderImpl(
+ session,
+ request,
+ maxSegmentByteSize,
+ populateReaderCallMetric(session, source.getTableId().get()));
+ }
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(
+ BigtableSession session,
+ ReadRowsRequest request,
+ long maxSegmentByteSize,
+ ServiceCallMetric serviceCallMetric) {
+ this.session = session;
+ this.nextRequest = request;
+ this.maxSegmentByteSize = maxSegmentByteSize;
+ this.serviceCallMetric = serviceCallMetric;
+ this.buffer = new ArrayDeque<>();
+ // Asynchronously refill buffer when there is 10% of the elements are
left
+ this.refillSegmentWaterMark = request.getRowsLimit() / 10;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ future = fetchNextSegment();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.size() < refillSegmentWaterMark && future == null) {
+ future = fetchNextSegment();
+ }
+ if (buffer.isEmpty() && future != null) {
+ waitReadRowsFuture();
+ }
+ currentRow = buffer.poll();
+ return currentRow != null;
+ }
+
+ private Future<UpstreamResults> fetchNextSegment() {
+ SettableFuture<UpstreamResults> f = SettableFuture.create();
+ // When the nextRequest is null, the last fill completed and the buffer
contains the last rows
+ if (nextRequest == null) {
+ f.set(new UpstreamResults(ImmutableList.of(), null));
+ return f;
+ }
+
+ // TODO(diegomez): Remove atomic ScanHandler for simpler
StreamObserver/Future implementation
+ AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
+ ScanHandler handler =
+ session
+ .getDataClient()
+ .readFlatRows(
+ nextRequest,
+ new StreamObserver<FlatRow>() {
+ List<Row> rows = new ArrayList<>();
+ long currentByteSize = 0;
+ boolean byteLimitReached = false;
+
+ @Override
+ public void onNext(FlatRow flatRow) {
+ Row row = FlatRowConverter.convert(flatRow);
+ currentByteSize += row.getSerializedSize();
+ rows.add(row);
+
+ if (currentByteSize > maxSegmentByteSize) {
+ byteLimitReached = true;
+ atomicScanHandler.get().cancel();
+ return;
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ f.setException(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ ReadRowsRequest nextNextRequest = null;
+
+ // When requested rows < limit, the current request will
be the last
+ if (byteLimitReached || rows.size() ==
nextRequest.getRowsLimit()) {
+ nextNextRequest =
+ truncateRequest(nextRequest, rows.get(rows.size()
- 1).getKey());
+ }
+ f.set(new UpstreamResults(rows, nextNextRequest));
+ }
+ });
+ atomicScanHandler.set(handler);
+ return f;
+ }
+
+ private void waitReadRowsFuture() throws IOException {
+ try {
+ UpstreamResults r = future.get();
+ buffer.addAll(r.rows);
+ nextRequest = r.nextRequest;
+ future = null;
+ serviceCallMetric.call("ok");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof StatusRuntimeException) {
+ System.out.println(cause.hashCode());
+ serviceCallMetric.call(cause.hashCode());
Review Comment:
hashcode?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]