mutianf commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r872493141


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -216,29 +216,25 @@ public Row getCurrentRow() throws NoSuchElementException {
   static class BigtableSegmentReaderImpl implements Reader {
     private BigtableSession session;
 
-    @Nullable ReadRowsRequest nextRequest;
-    final Queue<Row> buffer = new ArrayDeque<>();
-    final int segmentSize;
-    final int refillSegmentWaterMark;
-    final String tableId;
-    final RowFilter filter;
-    final long bufferByteLimit;
+    @Nullable private ReadRowsRequest nextRequest;
+    private final Queue<Row> buffer;
+    private final long refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
     private Future<UpstreamResults> future;
     private Row currentRow;
-    private final ServiceCallMetric serviceCallMetric;
-    // private boolean upstreamResourcesExhausted;
+    private static ServiceCallMetric serviceCallMetric;
 
-    static class UpstreamResults {
-      final List<Row> rows;
-      final @Nullable ReadRowsRequest nextRequest;
+    private static class UpstreamResults {
+      private final List<Row> rows;
+      private final @Nullable ReadRowsRequest nextRequest;
 
       UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
         this.rows = rows;
         this.nextRequest = nextRequest;
       }
     }
 
-    static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+    public static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {

Review Comment:
   Does this need to be public? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -307,29 +293,27 @@ public boolean start() throws IOException {
 
     @Override
     public boolean advance() throws IOException {
-      if (buffer.size() < refillSegmentWaterMark && future == null) {
+      System.out.println("HI");

Review Comment:
   :)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -271,32 +265,24 @@ static BigtableSegmentReaderImpl create(BigtableSession 
session, BigtableSource
               .setFilter(filter)
               .setRowsLimit(source.getMaxBufferElementCount())
               .build();
-      return new BigtableSegmentReaderImpl(
-          session, request, source.getTableId().get(), filter, 
source.getMaxBufferElementCount());
+
+      serviceCallMetric = populateReaderCallMetric(session, 
source.getTableId().get());
+
+      long maxSegmentByteSize =
+          (long)(Runtime.getRuntime().totalMemory()
+              * DEFAULT_BYTE_LIMIT_PERCENTAGE);
+
+      return new BigtableSegmentReaderImpl(session, request, 
maxSegmentByteSize);
     }
 
     @VisibleForTesting
-    BigtableSegmentReaderImpl(
-        BigtableSession session,
-        ReadRowsRequest request,
-        String tableId,
-        RowFilter filter,
-        int segmentSize) {
+    BigtableSegmentReaderImpl(BigtableSession session, ReadRowsRequest 
request, long maxSegmentByteSize) {

Review Comment:
   I think you can move everything in this constructor, something like: 
   
   ```
   static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
       long maxSegmentByteSize = ...
       return new BigtableSegmentReaderImpl(session, request, 
maxSegmentByteSize);
   } 
   
   @VisibleForTesting
   BigtableSegmentReaderImpl(BigtableSession session, BigtableSource source, 
long maxSegmentByteSize) {
         RowSet set;
         if (source.getRanges().isEmpty()) {
           set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
         } else {
         ....
   }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,254 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable ReadRowsRequest nextRequest;
+    final Queue<Row> buffer = new ArrayDeque<>();
+    final int segmentSize;
+    final int refillSegmentWaterMark;
+    final String tableId;
+    final RowFilter filter;
+    final long bufferByteLimit;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private final ServiceCallMetric serviceCallMetric;
+    // private boolean upstreamResourcesExhausted;
+
+    static class UpstreamResults {
+      final List<Row> rows;
+      final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        // Presort the ranges so that fetch future segment can exit early when 
splitting the row set
+        Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+                .build();
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(set)
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+      return new BigtableSegmentReaderImpl(
+          session, request, source.getTableId().get(), filter, 
source.getMaxBufferElementCount());
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(
+        BigtableSession session,
+        ReadRowsRequest request,
+        String tableId,
+        RowFilter filter,
+        int segmentSize) {
+      this.session = session;
+      this.nextRequest = request;
+      this.tableId = tableId;
+      this.filter = filter;
+      this.segmentSize = segmentSize;
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.refillSegmentWaterMark = segmentSize / 10;
+      // this.upstreamResourcesExhausted = false;
+
+      long availableMemory =
+          Runtime.getRuntime().maxMemory()
+              - (Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory());
+      bufferByteLimit = (long) (DEFAULT_BYTE_LIMIT_PERCENTAGE * 
availableMemory);
+
+      serviceCallMetric = populateReaderCallMetric(session, tableId);
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      future = fetchNextSegment();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (buffer.size() < refillSegmentWaterMark && future == null) {
+        future = fetchNextSegment();
+      }
+      if (buffer.isEmpty() && future != null) {
+        waitReadRowsFuture();
+      }
+      // The last requst will return an empty rowSet which will mean that 
there are no more rows to read
+      if (buffer.isEmpty()) {
+        return false;
+      }
+      currentRow = buffer.remove();
+      return currentRow != null;
+    }
+
+    Future<UpstreamResults> fetchNextSegment() {
+      SettableFuture<UpstreamResults> f = SettableFuture.create();
+      if (nextRequest == null) {
+        f.set(new UpstreamResults(ImmutableList.of(), null));
+        return f;
+      }
+
+      // TODO(diegomez): Remove atomic ScanHandler for simpler 
StreamObserver/Future implementation
+      AtomicReference<ScanHandler> atomic = new AtomicReference<>();
+      ScanHandler handler =
+          session
+              .getDataClient()
+              .readFlatRows(
+                  nextRequest,
+                  new StreamObserver<FlatRow>() {
+                    List<Row> rows = new ArrayList<>();
+                    long currentByteSize = 0;
+                    boolean byteLimitReached = false;
+
+                    @Override
+                    public void onNext(FlatRow flatRow) {
+                      currentByteSize += computeRowSize(flatRow);
+                      Row row = FlatRowConverter.convert(flatRow);
+                      rows.add(row);
+
+                      if (currentByteSize > bufferByteLimit) {
+                        byteLimitReached = true;
+                        atomic.get().cancel();
+                        return;
+                      }
+                    }
+
+                    @Override
+                    public void onError(Throwable e) {
+                      f.setException(e);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                      ReadRowsRequest nextNextRequest = null;
+
+                      if (byteLimitReached || rows.size() == segmentSize) {

Review Comment:
   Hmm did you add the explanation? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -362,42 +346,46 @@ public void onError(Throwable e) {
                     public void onCompleted() {
                       ReadRowsRequest nextNextRequest = null;
 
-                      if (byteLimitReached || rows.size() == segmentSize) {
+                      // When requested rows < limit, the current request will 
be the last
+                      if (byteLimitReached || rows.size() == 
nextRequest.getRowsLimit()) {
                         nextNextRequest =
                             truncateRequest(nextRequest, rows.get(rows.size() 
- 1).getKey());
                       }
-
                       f.set(new UpstreamResults(rows, nextNextRequest));
                     }
                   });
-      atomic.set(handler);
+      atomicScanHandler.set(handler);
       return f;
     }
 
-    void waitReadRowsFuture() throws IOException {
+    private void waitReadRowsFuture() throws IOException {
       try {
         UpstreamResults r = future.get();
         buffer.addAll(r.rows);
         nextRequest = r.nextRequest;
         future = null;
         serviceCallMetric.call("ok");
-      } catch (StatusRuntimeException e) {
-        serviceCallMetric.call(e.getStatus().getCode().value());
-        throw e;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new IOException(e);
       } catch (ExecutionException e) {
-        throw new IOException(e);
+        throw new IOException(e.getCause());
+      }
+      catch (Exception e) {
+        if (e instanceof InterruptedException) {

Review Comment:
   Should this also be `if (e.getCause() instanceof InterruptedException)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to