mutianf commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r870655077


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -402,6 +404,28 @@ public Read withRowFilter(RowFilter filter) {
       return withRowFilter(StaticValueProvider.of(filter));
     }
 
+    /**
+     * Returns a new {@link BigtableIO.Read} that will break up read requests 
into smaller batches.
+     * This function will switch the base BigtableIO.Reader class to using the 
SegmentReader. If
+     * null is passed, this behavior will be disabled and the stream reader 
will be usedf.

Review Comment:
   nit: "will be used**f**".



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,254 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable ReadRowsRequest nextRequest;
+    final Queue<Row> buffer = new ArrayDeque<>();
+    final int segmentSize;
+    final int refillSegmentWaterMark;
+    final String tableId;
+    final RowFilter filter;
+    final long bufferByteLimit;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private final ServiceCallMetric serviceCallMetric;
+    // private boolean upstreamResourcesExhausted;
+
+    static class UpstreamResults {
+      final List<Row> rows;
+      final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))

Review Comment:
   Is it always true that start keys are closed and end keys are opened? Does 
it matter here?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +149,313 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link 
BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the 
future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 end)));
+    FlatRow expectedRow = 
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()))
+        .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+    BigtableService.Reader underTest =
+        BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession, 
mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow), 
underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and 
read. This example
+   * uses a single range with SEGMENT_SIZE*2+1 rows. Range: [a, b1, ... b199, 
c)

Review Comment:
   Is this comment still true? Start key is starting from b0000? And ends at 
b0000+segment size* 2?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +149,313 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link 
BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the 
future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 end)));
+    FlatRow expectedRow = 
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()))
+        .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+    BigtableService.Reader underTest =
+        BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession, 
mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow), 
underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and 
read. This example
+   * uses a single range with SEGMENT_SIZE*2+1 rows. Range: [a, b1, ... b199, 
c)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+    ByteKey start = 
ByteKey.copyFrom("b00000".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = 
ByteKey.copyFrom(("b00"+SEGMENT_SIZE*2).getBytes(StandardCharsets.UTF_8));
+
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 end)));
+
+    List<List<FlatRow>> expectedResults = ImmutableList.of(
+        generateSegmentResult("b",0,SEGMENT_SIZE, false),
+        generateSegmentResult("b", SEGMENT_SIZE, SEGMENT_SIZE, false),
+        ImmutableList.of());
+
+    OngoingStubbing
+        <?> stub = 
when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession, 
mockBigtableSource);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream().flatMap(Collection::stream).map(i -> 
FlatRowConverter.convert(i)).collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and 
read. This example
+   * uses two ranges with SEGMENT_SIZE rows. The buffer should be refilled 
twice and
+   * ReadRowsAsync should be called three times. The last rpc call should 
return zero rows. The
+   * following test follows this example: FirstRange: [a,b1,...,b99,c) 
SecondRange: [c,d1,...,d99,e)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadMultipleRanges() throws IOException {
+    ByteKey start = 
ByteKey.copyFrom("b00000".getBytes(StandardCharsets.UTF_8));
+    ByteKey middleKey = 
ByteKey.copyFrom(("b00"+SEGMENT_SIZE).getBytes(StandardCharsets.UTF_8));
+    ByteKey end = 
ByteKey.copyFrom(("b00"+SEGMENT_SIZE*2).getBytes(StandardCharsets.UTF_8));
+
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 middleKey),
+        ByteKeyRange.of(middleKey, end)));
+
+    List<List<FlatRow>> expectedResults = ImmutableList.of(
+        generateSegmentResult("b",0,SEGMENT_SIZE, false),
+        generateSegmentResult("b", SEGMENT_SIZE, SEGMENT_SIZE, false),

Review Comment:
   same here



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +149,313 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link 
BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the 
future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 end)));
+    FlatRow expectedRow = 
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()))
+        .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+    BigtableService.Reader underTest =
+        BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession, 
mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow), 
underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and 
read. This example
+   * uses a single range with SEGMENT_SIZE*2+1 rows. Range: [a, b1, ... b199, 
c)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+    ByteKey start = 
ByteKey.copyFrom("b00000".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = 
ByteKey.copyFrom(("b00"+SEGMENT_SIZE*2).getBytes(StandardCharsets.UTF_8));
+
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 end)));
+
+    List<List<FlatRow>> expectedResults = ImmutableList.of(
+        generateSegmentResult("b",0,SEGMENT_SIZE, false),

Review Comment:
   Can we simplify this to generateSegmentResult("b", 0, SEGMENT_SIZE * 2, 
false)? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,254 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable ReadRowsRequest nextRequest;
+    final Queue<Row> buffer = new ArrayDeque<>();
+    final int segmentSize;
+    final int refillSegmentWaterMark;
+    final String tableId;
+    final RowFilter filter;
+    final long bufferByteLimit;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private final ServiceCallMetric serviceCallMetric;
+    // private boolean upstreamResourcesExhausted;

Review Comment:
   why is this commented out? Should it be cleaned up? 



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +149,313 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link 
BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the 
future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+    ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 end)));
+    FlatRow expectedRow = 
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()))
+        .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+    BigtableService.Reader underTest =
+        BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession, 
mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow), 
underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and 
read. This example
+   * uses a single range with SEGMENT_SIZE*2+1 rows. Range: [a, b1, ... b199, 
c)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+    ByteKey start = 
ByteKey.copyFrom("b00000".getBytes(StandardCharsets.UTF_8));
+    ByteKey end = 
ByteKey.copyFrom(("b00"+SEGMENT_SIZE*2).getBytes(StandardCharsets.UTF_8));
+
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 end)));
+
+    List<List<FlatRow>> expectedResults = ImmutableList.of(
+        generateSegmentResult("b",0,SEGMENT_SIZE, false),
+        generateSegmentResult("b", SEGMENT_SIZE, SEGMENT_SIZE, false),
+        ImmutableList.of());
+
+    OngoingStubbing
+        <?> stub = 
when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
+
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession, 
mockBigtableSource);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream().flatMap(Collection::stream).map(i -> 
FlatRowConverter.convert(i)).collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and 
read. This example
+   * uses two ranges with SEGMENT_SIZE rows. The buffer should be refilled 
twice and
+   * ReadRowsAsync should be called three times. The last rpc call should 
return zero rows. The
+   * following test follows this example: FirstRange: [a,b1,...,b99,c) 
SecondRange: [c,d1,...,d99,e)

Review Comment:
   Same here



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,254 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable ReadRowsRequest nextRequest;
+    final Queue<Row> buffer = new ArrayDeque<>();
+    final int segmentSize;
+    final int refillSegmentWaterMark;
+    final String tableId;
+    final RowFilter filter;
+    final long bufferByteLimit;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private final ServiceCallMetric serviceCallMetric;
+    // private boolean upstreamResourcesExhausted;
+
+    static class UpstreamResults {
+      final List<Row> rows;
+      final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        // Presort the ranges so that fetch future segment can exit early when 
splitting the row set
+        Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+                .build();
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(set)
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+      return new BigtableSegmentReaderImpl(
+          session, request, source.getTableId().get(), filter, 
source.getMaxBufferElementCount());
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(
+        BigtableSession session,
+        ReadRowsRequest request,
+        String tableId,
+        RowFilter filter,
+        int segmentSize) {
+      this.session = session;
+      this.nextRequest = request;
+      this.tableId = tableId;
+      this.filter = filter;
+      this.segmentSize = segmentSize;
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.refillSegmentWaterMark = segmentSize / 10;
+      // this.upstreamResourcesExhausted = false;
+
+      long availableMemory =
+          Runtime.getRuntime().maxMemory()
+              - (Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory());
+      bufferByteLimit = (long) (DEFAULT_BYTE_LIMIT_PERCENTAGE * 
availableMemory);
+
+      serviceCallMetric = populateReaderCallMetric(session, tableId);
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      future = fetchNextSegment();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (buffer.size() < refillSegmentWaterMark && future == null) {
+        future = fetchNextSegment();
+      }
+      if (buffer.isEmpty() && future != null) {
+        waitReadRowsFuture();
+      }
+      // The last requst will return an empty rowSet which will mean that 
there are no more rows to read
+      if (buffer.isEmpty()) {
+        return false;
+      }
+      currentRow = buffer.remove();
+      return currentRow != null;
+    }
+
+    Future<UpstreamResults> fetchNextSegment() {
+      SettableFuture<UpstreamResults> f = SettableFuture.create();
+      if (nextRequest == null) {
+        f.set(new UpstreamResults(ImmutableList.of(), null));
+        return f;
+      }
+
+      // TODO(diegomez): Remove atomic ScanHandler for simpler 
StreamObserver/Future implementation
+      AtomicReference<ScanHandler> atomic = new AtomicReference<>();
+      ScanHandler handler =
+          session
+              .getDataClient()
+              .readFlatRows(
+                  nextRequest,
+                  new StreamObserver<FlatRow>() {
+                    List<Row> rows = new ArrayList<>();
+                    long currentByteSize = 0;
+                    boolean byteLimitReached = false;
+
+                    @Override
+                    public void onNext(FlatRow flatRow) {
+                      currentByteSize += computeRowSize(flatRow);
+                      Row row = FlatRowConverter.convert(flatRow);
+                      rows.add(row);
+
+                      if (currentByteSize > bufferByteLimit) {

Review Comment:
   should we move this check to before adding row to rows? Or it's ok to go 
over byte limit by a little bit? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,254 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable ReadRowsRequest nextRequest;
+    final Queue<Row> buffer = new ArrayDeque<>();
+    final int segmentSize;
+    final int refillSegmentWaterMark;
+    final String tableId;
+    final RowFilter filter;
+    final long bufferByteLimit;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private final ServiceCallMetric serviceCallMetric;
+    // private boolean upstreamResourcesExhausted;
+
+    static class UpstreamResults {
+      final List<Row> rows;
+      final @Nullable ReadRowsRequest nextRequest;

Review Comment:
   +1 to Igor's comment, please make these private too unless it's used in test 
:)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -320,6 +570,15 @@ public String toString() {
 
   @Override
   public Reader createReader(BigtableSource source) throws IOException {
+    BigtableSession session = new BigtableSession(options);
+    if (source.getMaxBufferElementCount() != null) {
+      return BigtableSegmentReaderImpl.create(session, source);
+    } else {
+      return new BigtableReaderImpl(session, source);
+    }
+  }
+
+  public Reader createSegmentReader(BigtableSource source) throws IOException {
     BigtableSession session = new BigtableSession(options);
     return new BigtableReaderImpl(session, source);

Review Comment:
   Why is this still returning BigtableReaderImpl?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,254 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable ReadRowsRequest nextRequest;
+    final Queue<Row> buffer = new ArrayDeque<>();
+    final int segmentSize;
+    final int refillSegmentWaterMark;
+    final String tableId;
+    final RowFilter filter;
+    final long bufferByteLimit;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private final ServiceCallMetric serviceCallMetric;
+    // private boolean upstreamResourcesExhausted;
+
+    static class UpstreamResults {
+      final List<Row> rows;
+      final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        // Presort the ranges so that fetch future segment can exit early when 
splitting the row set
+        Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+                .build();
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(set)
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+      return new BigtableSegmentReaderImpl(
+          session, request, source.getTableId().get(), filter, 
source.getMaxBufferElementCount());
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(
+        BigtableSession session,
+        ReadRowsRequest request,
+        String tableId,
+        RowFilter filter,
+        int segmentSize) {
+      this.session = session;
+      this.nextRequest = request;
+      this.tableId = tableId;
+      this.filter = filter;
+      this.segmentSize = segmentSize;
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.refillSegmentWaterMark = segmentSize / 10;
+      // this.upstreamResourcesExhausted = false;

Review Comment:
   is this still used?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,254 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    @Nullable ReadRowsRequest nextRequest;
+    final Queue<Row> buffer = new ArrayDeque<>();
+    final int segmentSize;
+    final int refillSegmentWaterMark;
+    final String tableId;
+    final RowFilter filter;
+    final long bufferByteLimit;
+    private Future<UpstreamResults> future;
+    private Row currentRow;
+    private final ServiceCallMetric serviceCallMetric;
+    // private boolean upstreamResourcesExhausted;
+
+    static class UpstreamResults {
+      final List<Row> rows;
+      final @Nullable ReadRowsRequest nextRequest;
+
+      UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet set;
+      if (source.getRanges().isEmpty()) {
+        set = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      } else {
+        RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+        for (int i = 0; i < source.getRanges().size(); i++) {
+          rowRanges[i] =
+              RowRange.newBuilder()
+                  .setStartKeyClosed(
+                      
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+                  .setEndKeyOpen(
+                      
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+                  .build();
+        }
+        // Presort the ranges so that fetch future segment can exit early when 
splitting the row set
+        Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+        set =
+            RowSet.newBuilder()
+                
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+                .build();
+      }
+
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(
+                  
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+              .setRows(set)
+              .setFilter(filter)
+              .setRowsLimit(source.getMaxBufferElementCount())
+              .build();
+      return new BigtableSegmentReaderImpl(
+          session, request, source.getTableId().get(), filter, 
source.getMaxBufferElementCount());
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(
+        BigtableSession session,
+        ReadRowsRequest request,
+        String tableId,
+        RowFilter filter,
+        int segmentSize) {
+      this.session = session;
+      this.nextRequest = request;
+      this.tableId = tableId;
+      this.filter = filter;
+      this.segmentSize = segmentSize;
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.refillSegmentWaterMark = segmentSize / 10;
+      // this.upstreamResourcesExhausted = false;
+
+      long availableMemory =
+          Runtime.getRuntime().maxMemory()
+              - (Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory());
+      bufferByteLimit = (long) (DEFAULT_BYTE_LIMIT_PERCENTAGE * 
availableMemory);
+
+      serviceCallMetric = populateReaderCallMetric(session, tableId);
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      future = fetchNextSegment();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (buffer.size() < refillSegmentWaterMark && future == null) {
+        future = fetchNextSegment();
+      }
+      if (buffer.isEmpty() && future != null) {
+        waitReadRowsFuture();
+      }
+      // The last requst will return an empty rowSet which will mean that 
there are no more rows to read
+      if (buffer.isEmpty()) {
+        return false;
+      }
+      currentRow = buffer.remove();
+      return currentRow != null;
+    }
+
+    Future<UpstreamResults> fetchNextSegment() {
+      SettableFuture<UpstreamResults> f = SettableFuture.create();
+      if (nextRequest == null) {
+        f.set(new UpstreamResults(ImmutableList.of(), null));
+        return f;
+      }
+
+      // TODO(diegomez): Remove atomic ScanHandler for simpler 
StreamObserver/Future implementation
+      AtomicReference<ScanHandler> atomic = new AtomicReference<>();

Review Comment:
   Can we rename it to scanHandler or something more meaningful? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to