diegomez17 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r872577450
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,224 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+
+ @Nullable private ReadRowsRequest nextRequest;
+ private final Queue<Row> buffer;
+ private final long refillSegmentWaterMark;
+ private final long maxSegmentByteSize;
+ private Future<UpstreamResults> future;
+ private Row currentRow;
+ private static ServiceCallMetric serviceCallMetric;
+
+ private static class UpstreamResults {
+ private final List<Row> rows;
+ private final @Nullable ReadRowsRequest nextRequest;
+
+ UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+ this.rows = rows;
+ this.nextRequest = nextRequest;
+ }
+ }
+
+ public static BigtableSegmentReaderImpl create(BigtableSession session,
BigtableSource source) {
+ RowSet set;
+ if (source.getRanges().isEmpty()) {
+ set =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+ } else {
+ RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+ for (int i = 0; i < source.getRanges().size(); i++) {
+ rowRanges[i] =
+ RowRange.newBuilder()
+ .setStartKeyClosed(
+
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+ .setEndKeyOpen(
+
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+ .build();
+ }
+ set =
+ RowSet.newBuilder()
+
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+ .build();
+ }
+
+ RowFilter filter =
+ MoreObjects.firstNonNull(source.getRowFilter(),
RowFilter.getDefaultInstance());
+ ReadRowsRequest request =
+ ReadRowsRequest.newBuilder()
+ .setTableName(
+
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+ .setRows(set)
+ .setFilter(filter)
+ .setRowsLimit(source.getMaxBufferElementCount())
+ .build();
+
+ serviceCallMetric = populateReaderCallMetric(session,
source.getTableId().get());
+
+ long maxSegmentByteSize =
+ (long)(Runtime.getRuntime().totalMemory()
+ * DEFAULT_BYTE_LIMIT_PERCENTAGE);
+
+ return new BigtableSegmentReaderImpl(session, request,
maxSegmentByteSize);
+ }
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(BigtableSession session, ReadRowsRequest
request, long maxSegmentByteSize) {
+ this.session = session;
+ this.nextRequest = request;
+ this.maxSegmentByteSize = maxSegmentByteSize;
+ this.buffer = new ArrayDeque<>();
+ // Asynchronously refill buffer when there is 10% of the elements are
left
+ this.refillSegmentWaterMark = request.getRowsLimit() / 10;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ future = fetchNextSegment();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ System.out.println("HI");
+ if (buffer.size() < refillSegmentWaterMark && future == null ) {
+ future = fetchNextSegment();
+ }
+ if (buffer.isEmpty() && future != null) {
+ waitReadRowsFuture();
+ }
+ currentRow = buffer.poll();
+ return currentRow != null;
+ }
+
+ private Future<UpstreamResults> fetchNextSegment() {
+ SettableFuture<UpstreamResults> f = SettableFuture.create();
+ // When the nextRequest is null, the last fill completed and the buffer
contains the last rows
+ if (nextRequest == null) {
+ f.set(new UpstreamResults(ImmutableList.of(), null));
+ return f;
+ }
+
+ // TODO(diegomez): Remove atomic ScanHandler for simpler
StreamObserver/Future implementation
+ AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
+ ScanHandler handler =
+ session
+ .getDataClient()
+ .readFlatRows(
+ nextRequest,
+ new StreamObserver<FlatRow>() {
+ List<Row> rows = new ArrayList<>();
+ long currentByteSize = 0;
+ boolean byteLimitReached = false;
+
+ @Override
+ public void onNext(FlatRow flatRow) {
+ Row row = FlatRowConverter.convert(flatRow);
+ currentByteSize += row.getSerializedSize();
+ rows.add(row);
+
+ if (currentByteSize > maxSegmentByteSize) {
+ byteLimitReached = true;
+ atomicScanHandler.get().cancel();
+ return;
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ f.setException(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ ReadRowsRequest nextNextRequest = null;
+
+ // When requested rows < limit, the current request will
be the last
+ if (byteLimitReached || rows.size() ==
nextRequest.getRowsLimit()) {
+ nextNextRequest =
+ truncateRequest(nextRequest, rows.get(rows.size()
- 1).getKey());
+ }
+ f.set(new UpstreamResults(rows, nextNextRequest));
+ }
+ });
+ atomicScanHandler.set(handler);
+ return f;
+ }
+
+ private void waitReadRowsFuture() throws IOException {
+ try {
+ UpstreamResults r = future.get();
+ buffer.addAll(r.rows);
+ nextRequest = r.nextRequest;
+ future = null;
+ serviceCallMetric.call("ok");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ }
+ catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ if (e.getCause() instanceof StatusRuntimeException) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private ReadRowsRequest truncateRequest(ReadRowsRequest request,
ByteString lastKey) {
+ RowSet.Builder segment = RowSet.newBuilder();
+
+ for (RowRange rowRange : request.getRows().getRowRangesList()) {
+ int startCmp = StartPoint.extract(rowRange).compareTo(new
StartPoint(lastKey, true));
+ int endCmp = EndPoint.extract(rowRange).compareTo(new
EndPoint(lastKey, true));
+
+ if (startCmp > 0) {
+ // If the startKey is passed the split point than add the whole range
+ segment.addRowRanges(rowRange);
+ } else if (endCmp > 0) {
+ // Row is split, remove all read rowKeys and split RowSet at last
buffered Row
+ RowRange subRange =
rowRange.toBuilder().setStartKeyOpen(lastKey).build();
+ segment.addRowRanges(subRange);
+ }
+ }
+ if (segment.getRowRangesCount() == 0) {
+ return null;
+ }
+ ReadRowsRequest newRequest =
+ ReadRowsRequest.newBuilder()
+ .setTableName(request.getTableName())
+ .setRows(segment.build())
+ .setFilter(request.getFilter())
+ .setRowsLimit(request.getRowsLimit())
+ .build();
+
+ return newRequest;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (session == null) {
+ // Only possible when previously closed, so we know that results is
also null.
Review Comment:
Do not know, from previous implementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]