diegomez17 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r875136348
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,223 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+
+ private @Nullable ReadRowsRequest nextRequest;
+ private @Nullable Row currentRow;
+ private final Queue<Row> buffer;
+ private final int refillSegmentWaterMark;
+ private final long maxSegmentByteSize;
+ private Future<UpstreamResults> future;
+ private ServiceCallMetric serviceCallMetric;
+
+ private static class UpstreamResults {
+ private final List<Row> rows;
+ private final @Nullable ReadRowsRequest nextRequest;
+
+ private UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest
nextRequest) {
+ this.rows = rows;
+ this.nextRequest = nextRequest;
+ }
+ }
+
+ static BigtableSegmentReaderImpl create(BigtableSession session,
BigtableSource source) {
+ RowSet.Builder rowSetBuilder = RowSet.newBuilder();
+ if (source.getRanges().isEmpty()) {
+ rowSetBuilder =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance());
+ } else {
+ // BigtableSource only contains ranges with a closed start key and
open end key
+ for (ByteKeyRange beamRange : source.getRanges()) {
+ RowRange.Builder rangeBuilder = rowSetBuilder.addRowRangesBuilder();
+ rangeBuilder
+
.setStartKeyClosed(ByteString.copyFrom(beamRange.getStartKey().getValue()))
+
.setEndKeyOpen(ByteString.copyFrom(beamRange.getEndKey().getValue()));
+ }
+ }
+ RowSet rowSet = rowSetBuilder.build();
+ RowFilter filter =
+ MoreObjects.firstNonNull(source.getRowFilter(),
RowFilter.getDefaultInstance());
+
+ long maxSegmentByteSize =
+ (long)
+ Math.max(
+ MIN_BYTE_BUFFER_SIZE,
+ (Runtime.getRuntime().totalMemory() *
DEFAULT_BYTE_LIMIT_PERCENTAGE));
+
+ return new BigtableSegmentReaderImpl(
+ session,
+
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()),
+ rowSet,
+ source.getMaxBufferElementCount(),
+ maxSegmentByteSize,
+ filter,
+ populateReaderCallMetric(session, source.getTableId().get()));
+ }
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(
+ BigtableSession session,
+ String tableName,
+ RowSet rowSet,
+ int maxRowsInBuffer,
+ long maxSegmentByteSize,
+ RowFilter filter,
+ ServiceCallMetric serviceCallMetric) {
+ if (rowSet.equals(rowSet.getDefaultInstanceForType())) {
+ rowSet =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+ }
+ ReadRowsRequest request =
+ ReadRowsRequest.newBuilder()
+ .setTableName(tableName)
+ .setRows(rowSet)
+ .setFilter(filter)
+ .setRowsLimit(maxRowsInBuffer)
+ .build();
+
+ this.session = session;
+ this.nextRequest = request;
+ this.maxSegmentByteSize = maxSegmentByteSize;
+ this.serviceCallMetric = serviceCallMetric;
+ this.buffer = new ArrayDeque<>();
+ // Asynchronously refill buffer when there is 10% of the elements are
left
+ this.refillSegmentWaterMark = (int) (request.getRowsLimit() *
WATERMARK_PERCENTAGE);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ future = fetchNextSegment();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.size() < refillSegmentWaterMark && future == null) {
+ future = fetchNextSegment();
+ }
+ if (buffer.isEmpty() && future != null) {
+ waitReadRowsFuture();
+ }
+ currentRow = buffer.poll();
+ return currentRow != null;
+ }
+
+ private Future<UpstreamResults> fetchNextSegment() {
+ SettableFuture<UpstreamResults> f = SettableFuture.create();
+ // When the nextRequest is null, the last fill completed and the buffer
contains the last rows
+ if (nextRequest == null) {
+ f.set(new UpstreamResults(ImmutableList.of(), null));
+ return f;
+ }
+
+ // TODO(diegomez): Remove atomic ScanHandler for simpler
StreamObserver/Future implementation
+ AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
+ ScanHandler handler =
+ session
+ .getDataClient()
+ .readFlatRows(
+ nextRequest,
+ new StreamObserver<FlatRow>() {
+ List<Row> rows = new ArrayList<>();
+ long currentByteSize = 0;
+ boolean byteLimitReached = false;
+
+ @Override
+ public void onNext(FlatRow flatRow) {
+ Row row = FlatRowConverter.convert(flatRow);
+ currentByteSize += row.getSerializedSize();
+ rows.add(row);
+
+ if (currentByteSize > maxSegmentByteSize) {
+ byteLimitReached = true;
+ atomicScanHandler.get().cancel();
+ return;
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ f.setException(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ ReadRowsRequest nextNextRequest = null;
+
+ // When requested rows < limit, the current request will
be the last
+ if (byteLimitReached || rows.size() ==
nextRequest.getRowsLimit()) {
+ nextNextRequest =
+ truncateRequest(nextRequest, rows.get(rows.size()
- 1).getKey());
+ }
+ f.set(new UpstreamResults(rows, nextNextRequest));
+ }
+ });
+ atomicScanHandler.set(handler);
+ return f;
+ }
+
+ private void waitReadRowsFuture() throws IOException {
+ try {
+ UpstreamResults r = future.get();
+ buffer.addAll(r.rows);
+ nextRequest = r.nextRequest;
+ future = null;
+ serviceCallMetric.call("ok");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof StatusRuntimeException) {
+ serviceCallMetric.call(((StatusRuntimeException)
cause).getStatus().getCode().value());
+ }
+ throw new IOException(cause);
+ }
+ }
+
+ private ReadRowsRequest truncateRequest(ReadRowsRequest request,
ByteString lastKey) {
+ RowSet.Builder segment = RowSet.newBuilder();
+
+ for (RowRange rowRange : request.getRows().getRowRangesList()) {
+ int startCmp = StartPoint.extract(rowRange).compareTo(new
StartPoint(lastKey, true));
+ int endCmp = EndPoint.extract(rowRange).compareTo(new
EndPoint(lastKey, true));
+
+ if (startCmp > 0) {
+ // If the startKey is passed the split point than add the whole range
+ segment.addRowRanges(rowRange);
+ } else if (endCmp > 0) {
+ // Row is split, remove all read rowKeys and split RowSet at last
buffered Row
+ RowRange subRange =
rowRange.toBuilder().setStartKeyOpen(lastKey).build();
+ segment.addRowRanges(subRange);
+ }
+ }
+ if (segment.getRowRangesCount() == 0) {
+ return null;
+ }
+
+ ReadRowsRequest.Builder requestBuilder = request.toBuilder();
+ requestBuilder.clearRows();
Review Comment:
Yes, since we are pulling the builder from the original request, it contains
the filter on Line 286. I believe that the only thing that needs to change is
the RowSet under the request.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]