igorbernstein2 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r873882690


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +209,216 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    private @Nullable ReadRowsRequest nextRequest;
+    private @Nullable Row currentRow;
+    private final Queue<Row> buffer;
+    private final long refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private Future<UpstreamResults> future;
+    private ServiceCallMetric serviceCallMetric;
+
+    private static class UpstreamResults {
+      private final List<Row> rows;
+      private final @Nullable ReadRowsRequest nextRequest;
+
+      private UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest 
nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet.Builder rowSetBuilder = RowSet.newBuilder();
+      if (source.getRanges().isEmpty()) {
+        rowSetBuilder = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance());
+      } else {
+        for (ByteKeyRange beamRange : source.getRanges()) {
+          RowRange.Builder rangeBuilder = rowSetBuilder.addRowRangesBuilder();
+          rangeBuilder
+              
.setStartKeyClosed(ByteString.copyFrom(beamRange.getStartKey().getValue()))
+              
.setEndKeyOpen(ByteString.copyFrom(beamRange.getEndKey().getValue()));
+        }
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(rowSetBuilder.build())
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+
+      long maxSegmentByteSize =
+          (long) (Runtime.getRuntime().totalMemory() * 
DEFAULT_BYTE_LIMIT_PERCENTAGE);
+
+      return new BigtableSegmentReaderImpl(
+          session,
+          request,
+          maxSegmentByteSize,
+          populateReaderCallMetric(session, source.getTableId().get()));
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(
+        BigtableSession session,
+        ReadRowsRequest request,
+        long maxSegmentByteSize,
+        ServiceCallMetric serviceCallMetric) {
+      this.session = session;
+      this.nextRequest = request;
+      this.maxSegmentByteSize = maxSegmentByteSize;
+      this.serviceCallMetric = serviceCallMetric;
+      this.buffer = new ArrayDeque<>();
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.refillSegmentWaterMark = request.getRowsLimit() / 10;

Review Comment:
   please move the divisor to a constant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to