diegomez17 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r866871574


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +231,255 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private Row currentRow;
+    private Queue<FlatRow> buffer;
+    private RowSet rowSet;
+    private ServiceCallMetric serviceCallMetric;
+    private Future<ImmutablePair<List<FlatRow>, Boolean>> future;
+    private ByteString lastFetchedRow;
+    private boolean lastFillComplete;
+    private boolean byteLimitReached;
+
+    private final int segmentLimit;
+    private final int segmentWaterMark;
+    private final String tableName;
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(BigtableSession session, BigtableSource source) {
+      this.session = session;
+      if (source.getMaxBufferElementCount() != null && 
source.getMaxBufferElementCount() != 0) {
+        this.segmentLimit = source.getMaxBufferElementCount();
+      } else {
+        this.segmentLimit = DEFAULT_SEGMENT_SIZE;
+      }
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.segmentWaterMark = segmentLimit / 10;
+      tableName = 
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      buffer = new ArrayDeque<>();
+      lastFillComplete = false;
+      byteLimitReached = false;
+      RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+      for (int i = 0; i < source.getRanges().size(); i++) {
+        rowRanges[i] =
+            RowRange.newBuilder()
+                .setStartKeyClosed(
+                    
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                .setEndKeyOpen(
+                    
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                .build();
+      }
+      // Presort the ranges so that future segmentation can exit early when 
splitting the row set
+      Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+      rowSet =
+          RowSet.newBuilder()
+              
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+              .build();
+
+      HashMap<String, String> baseLabels = new HashMap<>();
+      baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+      baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
+      baseLabels.put(MonitoringInfoConstants.Labels.METHOD, 
"google.bigtable.v2.ReadRows");
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.RESOURCE,
+          GcpResourceIdentifiers.bigtableResource(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.BIGTABLE_PROJECT_ID, 
session.getOptions().getProjectId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.INSTANCE_ID, 
session.getOptions().getInstanceId());
+      baseLabels.put(
+          MonitoringInfoConstants.Labels.TABLE_ID,
+          GcpResourceIdentifiers.bigtableTableID(
+              session.getOptions().getProjectId(),
+              session.getOptions().getInstanceId(),
+              source.getTableId().get()));
+      serviceCallMetric =
+          new 
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+
+      future = startNextSegmentRead();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (buffer.size() <= segmentWaterMark && future == null && 
!lastFillComplete) {
+        if (!splitRowSet(lastFetchedRow)) {
+          return false;
+        }
+        future = startNextSegmentRead();
+      }
+      if (buffer.isEmpty()) {
+        if (future == null || lastFillComplete) {
+          return false;
+        }
+        waitReadRowsFuture();
+      }
+      // If the last fill is equal to row limit, the lastFillComplete flag 
will not be true
+      // until another RPC is called which will return 0 rows
+      if (buffer.isEmpty() && lastFillComplete) {
+        return false;
+      }
+      currentRow = FlatRowConverter.convert(buffer.remove());
+      return currentRow != null;
+    }
+
+    private SettableFuture<ImmutablePair<List<FlatRow>, Boolean>> 
startNextSegmentRead() {
+      SettableFuture<ImmutablePair<List<FlatRow>, Boolean>> f = 
SettableFuture.create();
+      long availableMemory =
+          Runtime.getRuntime().maxMemory()
+              - (Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory());
+      long bufferByteLimit = (long) (DEFAULT_BYTE_LIMIT_PERCENTAGE * 
availableMemory);

Review Comment:
   Are there any potential problems if we don't account for in-use memory? I 
wouldn't want to make an assumption on the machine's open memory.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to