diegomez17 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r871849603


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,254 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable ReadRowsRequest nextRequest;
+    final Queue<Row> buffer = new ArrayDeque<>();
+    final int segmentSize;
+    final int refillSegmentWaterMark;
+    final String tableId;
+    final RowFilter filter;
+    final long bufferByteLimit;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private final ServiceCallMetric serviceCallMetric;
+    // private boolean upstreamResourcesExhausted;
+
+    static class UpstreamResults {
+      final List<Row> rows;
+      final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        // Presort the ranges so that fetch future segment can exit early when 
splitting the row set
+        Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+                .build();
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(set)
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+      return new BigtableSegmentReaderImpl(
+          session, request, source.getTableId().get(), filter, 
source.getMaxBufferElementCount());
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(
+        BigtableSession session,
+        ReadRowsRequest request,
+        String tableId,
+        RowFilter filter,
+        int segmentSize) {
+      this.session = session;
+      this.nextRequest = request;
+      this.tableId = tableId;
+      this.filter = filter;
+      this.segmentSize = segmentSize;
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.refillSegmentWaterMark = segmentSize / 10;
+      // this.upstreamResourcesExhausted = false;
+
+      long availableMemory =
+          Runtime.getRuntime().maxMemory()
+              - (Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory());
+      bufferByteLimit = (long) (DEFAULT_BYTE_LIMIT_PERCENTAGE * 
availableMemory);
+
+      serviceCallMetric = populateReaderCallMetric(session, tableId);
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      future = fetchNextSegment();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (buffer.size() < refillSegmentWaterMark && future == null) {
+        future = fetchNextSegment();
+      }
+      if (buffer.isEmpty() && future != null) {
+        waitReadRowsFuture();
+      }
+      // The last requst will return an empty rowSet which will mean that 
there are no more rows to read
+      if (buffer.isEmpty()) {
+        return false;
+      }
+      currentRow = buffer.remove();
+      return currentRow != null;
+    }
+
+    Future<UpstreamResults> fetchNextSegment() {
+      SettableFuture<UpstreamResults> f = SettableFuture.create();
+      if (nextRequest == null) {
+        f.set(new UpstreamResults(ImmutableList.of(), null));
+        return f;
+      }
+
+      // TODO(diegomez): Remove atomic ScanHandler for simpler 
StreamObserver/Future implementation
+      AtomicReference<ScanHandler> atomic = new AtomicReference<>();
+      ScanHandler handler =
+          session
+              .getDataClient()
+              .readFlatRows(
+                  nextRequest,
+                  new StreamObserver<FlatRow>() {
+                    List<Row> rows = new ArrayList<>();
+                    long currentByteSize = 0;
+                    boolean byteLimitReached = false;
+
+                    @Override
+                    public void onNext(FlatRow flatRow) {
+                      currentByteSize += computeRowSize(flatRow);
+                      Row row = FlatRowConverter.convert(flatRow);
+                      rows.add(row);
+
+                      if (currentByteSize > bufferByteLimit) {
+                        byteLimitReached = true;
+                        atomic.get().cancel();
+                        return;
+                      }
+                    }
+
+                    @Override
+                    public void onError(Throwable e) {
+                      f.setException(e);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                      ReadRowsRequest nextNextRequest = null;
+
+                      if (byteLimitReached || rows.size() == segmentSize) {
+                        nextNextRequest =
+                            truncateRequest(nextRequest, rows.get(rows.size() 
- 1).getKey());
+                      }
+
+                      f.set(new UpstreamResults(rows, nextNextRequest));
+                    }
+                  });
+      atomic.set(handler);
+      return f;
+    }
+
+    void waitReadRowsFuture() throws IOException {
+      try {
+        UpstreamResults r = future.get();
+        buffer.addAll(r.rows);
+        nextRequest = r.nextRequest;
+        future = null;
+        serviceCallMetric.call("ok");
+      } catch (StatusRuntimeException e) {
+        serviceCallMetric.call(e.getStatus().getCode().value());
+        throw e;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+
+    ReadRowsRequest truncateRequest(ReadRowsRequest request, ByteString 
lastKey) {
+      RowSet setFromRequest = request.getRows();
+
+      RowSet.Builder segment = RowSet.newBuilder();
+      for (int i = 0; i < setFromRequest.getRowRangesCount(); i++) {
+        RowRange rowRange = setFromRequest.getRowRanges(i);
+        int startCmp = StartPoint.extract(rowRange).compareTo(new 
StartPoint(lastKey, true));
+        int endCmp = EndPoint.extract(rowRange).compareTo(new 
EndPoint(lastKey, true));
+
+        if (startCmp > 0) {
+          // If the startKey is passed the split point than add the whole range
+          segment.addRowRanges(rowRange);
+        } else if (endCmp > 0) {
+          // Row is split, remove all read rowKeys and split RowSet at last 
buffered Row
+          RowRange subRange = 
rowRange.toBuilder().setStartKeyOpen(lastKey).build();
+          segment.addRowRanges(subRange);
+        }
+      }
+      /* I think that this is now covered outside of this functionality
+      if (segment.getRowRangesCount() == 0) {
+        return false;
+      } */
+      ReadRowsRequest newRequest =
+          ReadRowsRequest.newBuilder()
+              
.setTableName(session.getOptions().getInstanceName().toTableNameStr(tableId))
+              .setRows(segment.build())
+              .setFilter(filter)
+              .setRowsLimit(segmentSize)
+              .build();
+
+      return newRequest;
+    }
+
+    private long computeRowSize(FlatRow row) {
+      return row.getRowKey().size()
+          + row.getCells().stream()
+              .mapToLong(c -> c.getQualifier().size() + c.getValue().size())
+              .sum();
+    }
+
+    @Override
+    public void close() throws IOException {
+      // Goal: by the end of this function, both results and session are null 
and closed,
+      // independent of what errors they throw or prior state.
+
+      if (session == null) {
+        // Only possible when previously closed, so we know that results is 
also null.
+        return;
+      }
+
+      // Session does not implement Closeable -- it's AutoCloseable. So we 
can't register it with
+      // the Closer, but we can use the Closer to simplify the error handling.
+      try (Closer closer = Closer.create()) {
+        session.close();
+      } finally {
+        session = null;
+      }

Review Comment:
   I am also confused by this. Are there issues with closing session? We can 
talk about this tomorrow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to