mutianf commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r875029147


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,223 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
+  @VisibleForTesting
+  static class BigtableSegmentReaderImpl implements Reader {
+    private BigtableSession session;
+
+    private @Nullable ReadRowsRequest nextRequest;
+    private @Nullable Row currentRow;
+    private final Queue<Row> buffer;
+    private final int refillSegmentWaterMark;
+    private final long maxSegmentByteSize;
+    private Future<UpstreamResults> future;
+    private ServiceCallMetric serviceCallMetric;
+
+    private static class UpstreamResults {
+      private final List<Row> rows;
+      private final @Nullable ReadRowsRequest nextRequest;
+
+      private UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest 
nextRequest) {
+        this.rows = rows;
+        this.nextRequest = nextRequest;
+      }
+    }
+
+    static BigtableSegmentReaderImpl create(BigtableSession session, 
BigtableSource source) {
+      RowSet.Builder rowSetBuilder = RowSet.newBuilder();
+      if (source.getRanges().isEmpty()) {
+        rowSetBuilder = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance());
+      } else {
+        // BigtableSource only contains ranges with a closed start key and 
open end key
+        for (ByteKeyRange beamRange : source.getRanges()) {
+          RowRange.Builder rangeBuilder = rowSetBuilder.addRowRangesBuilder();
+          rangeBuilder
+              
.setStartKeyClosed(ByteString.copyFrom(beamRange.getStartKey().getValue()))
+              
.setEndKeyOpen(ByteString.copyFrom(beamRange.getEndKey().getValue()));
+        }
+      }
+      RowSet rowSet = rowSetBuilder.build();
+      RowFilter filter =
+          MoreObjects.firstNonNull(source.getRowFilter(), 
RowFilter.getDefaultInstance());
+
+      long maxSegmentByteSize =
+          (long)
+              Math.max(
+                  MIN_BYTE_BUFFER_SIZE,
+                  (Runtime.getRuntime().totalMemory() * 
DEFAULT_BYTE_LIMIT_PERCENTAGE));
+
+      return new BigtableSegmentReaderImpl(
+          session,
+          
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()),
+          rowSet,
+          source.getMaxBufferElementCount(),
+          maxSegmentByteSize,
+          filter,
+          populateReaderCallMetric(session, source.getTableId().get()));
+    }
+
+    @VisibleForTesting
+    BigtableSegmentReaderImpl(
+        BigtableSession session,
+        String tableName,
+        RowSet rowSet,
+        int maxRowsInBuffer,
+        long maxSegmentByteSize,
+        RowFilter filter,
+        ServiceCallMetric serviceCallMetric) {
+      if (rowSet.equals(rowSet.getDefaultInstanceForType())) {
+        rowSet = 
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+      }
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              .setTableName(tableName)
+              .setRows(rowSet)
+              .setFilter(filter)
+              .setRowsLimit(maxRowsInBuffer)
+              .build();
+
+      this.session = session;
+      this.nextRequest = request;
+      this.maxSegmentByteSize = maxSegmentByteSize;
+      this.serviceCallMetric = serviceCallMetric;
+      this.buffer = new ArrayDeque<>();
+      // Asynchronously refill buffer when there is 10% of the elements are 
left
+      this.refillSegmentWaterMark = (int) (request.getRowsLimit() * 
WATERMARK_PERCENTAGE);
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      future = fetchNextSegment();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (buffer.size() < refillSegmentWaterMark && future == null) {
+        future = fetchNextSegment();
+      }
+      if (buffer.isEmpty() && future != null) {
+        waitReadRowsFuture();
+      }
+      currentRow = buffer.poll();
+      return currentRow != null;
+    }
+
+    private Future<UpstreamResults> fetchNextSegment() {
+      SettableFuture<UpstreamResults> f = SettableFuture.create();
+      // When the nextRequest is null, the last fill completed and the buffer 
contains the last rows
+      if (nextRequest == null) {
+        f.set(new UpstreamResults(ImmutableList.of(), null));
+        return f;
+      }
+
+      // TODO(diegomez): Remove atomic ScanHandler for simpler 
StreamObserver/Future implementation
+      AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
+      ScanHandler handler =
+          session
+              .getDataClient()
+              .readFlatRows(
+                  nextRequest,
+                  new StreamObserver<FlatRow>() {
+                    List<Row> rows = new ArrayList<>();
+                    long currentByteSize = 0;
+                    boolean byteLimitReached = false;
+
+                    @Override
+                    public void onNext(FlatRow flatRow) {
+                      Row row = FlatRowConverter.convert(flatRow);
+                      currentByteSize += row.getSerializedSize();
+                      rows.add(row);
+
+                      if (currentByteSize > maxSegmentByteSize) {
+                        byteLimitReached = true;
+                        atomicScanHandler.get().cancel();
+                        return;
+                      }
+                    }
+
+                    @Override
+                    public void onError(Throwable e) {
+                      f.setException(e);
+                    }
+
+                    @Override
+                    public void onCompleted() {
+                      ReadRowsRequest nextNextRequest = null;
+
+                      // When requested rows < limit, the current request will 
be the last
+                      if (byteLimitReached || rows.size() == 
nextRequest.getRowsLimit()) {
+                        nextNextRequest =
+                            truncateRequest(nextRequest, rows.get(rows.size() 
- 1).getKey());
+                      }
+                      f.set(new UpstreamResults(rows, nextNextRequest));
+                    }
+                  });
+      atomicScanHandler.set(handler);
+      return f;
+    }
+
+    private void waitReadRowsFuture() throws IOException {
+      try {
+        UpstreamResults r = future.get();
+        buffer.addAll(r.rows);
+        nextRequest = r.nextRequest;
+        future = null;
+        serviceCallMetric.call("ok");
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof StatusRuntimeException) {
+          serviceCallMetric.call(((StatusRuntimeException) 
cause).getStatus().getCode().value());

Review Comment:
   I think .getStatus().getCode().value() returns an integer, but line 377 
`serviceCallMetric.call("ok");` you're passing a string. Should line 377 be 
`serviceCallMetric.call(0);` instead? 



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +158,424 @@ public void testRead() throws IOException {
     verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
   }
 
+  /**
+   * This test ensures that protobuf creation and interactions with {@link 
BigtableDataClient} work
+   * as expected. This test checks that a single row is returned from the 
future.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+    RowSet.Builder ranges = RowSet.newBuilder();
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0), 
generateByteString(DEFAULT_PREFIX, 1)));
+
+    FlatRow expectedRow = 
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+    when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()))
+        .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            ranges.build(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    underTest.start();
+    Assert.assertEquals(FlatRowConverter.convert(expectedRow), 
underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    Mockito.verify(mockCallMetric, Mockito.times(2)).call("ok");
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and 
read. This example
+   * uses a single range with SEGMENT_SIZE*2+1 rows. Range: [b00000, b00001, 
... b00199, b00200)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+    RowSet.Builder ranges = RowSet.newBuilder();
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            ranges.build(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+    Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and 
read. This example
+   * uses two ranges with SEGMENT_SIZE rows. The buffer should be refilled 
twice and ReadRowsAsync
+   * should be called three times. The last rpc call should return zero rows. 
The following test
+   * follows this example: FirstRange: [b00000,b00001,...,b00099,b00100) 
SecondRange:
+   * [b00100,b00101,...,b00199,b00200)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadMultipleRanges() throws IOException {
+    RowSet.Builder ranges = RowSet.newBuilder();
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            ranges.build(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+
+    Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and 
read. This example
+   * uses three overlapping ranges. The logic should remove all keys that were 
already added to the
+   * buffer. The following test follows this example: FirstRange: 
[b00000,b00001,...,b00099,b00100)
+   * SecondRange: [b00050,b00051...b00100,b00101,...,b00199,b00200) 
ThirdRange: [b00070, b00200)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadMultipleRangesOverlappingKeys() throws IOException {
+    RowSet.Builder ranges = RowSet.newBuilder();
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, (int) (SEGMENT_SIZE * .7)),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            ranges.build(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            BigtableServiceImpl.populateReaderCallMetric(mockSession, 
TABLE_ID));
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+
+    verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+  }
+
+  /**
+   * This test ensures that all the rows are properly added to the buffer and 
read. This example
+   * uses an empty request to trigger a full table scan. RowRange: [b00000, 
b00001, ... b00300)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadFullTableScan() throws IOException {
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE * 2, 
SEGMENT_SIZE),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            RowSet.getDefaultInstance(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+
+    Mockito.verify(mockCallMetric, Mockito.times(4)).call("ok");;
+  }
+
+  /**
+   * This test compares the amount of RowRanges being requested after the 
buffer is refilled. After
+   * reading the first buffer, the first two RowRanges should be removed and 
the RowRange containing
+   * [b00100,b00200) should be requested.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadFillBuffer() throws IOException {
+    RowSet.Builder ranges = RowSet.newBuilder();
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2)));
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+    ranges.addRowRanges(
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+            generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            ranges.build(),
+            SEGMENT_SIZE,
+            DEFAULT_BYTE_SEGMENT_SIZE,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    verify(mockBigtableDataClient, times(1))
+        .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+    Assert.assertEquals(3, 
requestCaptor.getValue().getRows().getRowRangesCount());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    verify(mockBigtableDataClient, times(3))
+        .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+    Assert.assertEquals(1, 
requestCaptor.getValue().getRows().getRowRangesCount());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+
+    Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+  }
+
+  /**
+   * This test checks that the buffer will stop filling up once the byte limit 
is reached. It will
+   * cancel the ScanHandler after reached the limit. This test completes one 
fill and contains one
+   * Row after the first buffer has been completed. The test cheaks the 
current available memory in
+   * the JVM and uses a percent of it to mock the original behavior. Range: 
[b00000, b00010)
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadByteLimitBuffer() throws IOException {
+    long segmentByteLimit = DEFAULT_ROW_SIZE * (SEGMENT_SIZE / 2);
+    int numOfRowsInsideBuffer = (int) (segmentByteLimit / DEFAULT_ROW_SIZE) + 
1;
+
+    RowRange mockRowRange =
+        generateRowRange(
+            generateByteString(DEFAULT_PREFIX, 0),
+            generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE));
+
+    OngoingStubbing<?> stub =
+        when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), 
any()));
+    List<List<FlatRow>> expectedResults =
+        ImmutableList.of(
+            generateLargeSegmentResult(DEFAULT_PREFIX, 0, 
numOfRowsInsideBuffer),
+            generateSegmentResult(
+                DEFAULT_PREFIX, numOfRowsInsideBuffer, SEGMENT_SIZE - 
numOfRowsInsideBuffer),
+            ImmutableList.of());
+    expectRowResults(stub, expectedResults);
+
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableSegmentReaderImpl(
+            mockSession,
+            TABLE_ID,
+            RowSet.newBuilder().addRowRanges(mockRowRange).build(),
+            SEGMENT_SIZE,
+            segmentByteLimit,
+            RowFilter.getDefaultInstance(),
+            mockCallMetric);
+
+    List<Row> actualResults = new ArrayList<>();
+    Assert.assertTrue(underTest.start());
+    do {
+      actualResults.add(underTest.getCurrentRow());
+    } while (underTest.advance());
+
+    Assert.assertEquals(
+        expectedResults.stream()
+            .flatMap(Collection::stream)
+            .map(i -> FlatRowConverter.convert(i))
+            .collect(Collectors.toList()),
+        actualResults);
+    underTest.close();
+
+    Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+  }
+
+  /**
+   * This test ensures the Exception handling inside of the scanHandler. This 
test will check if a
+   * StatusRuntimeException was thrown.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testReadSegmentExceptionHandling() throws IOException {
+    // mockCallMetric must be fixed

Review Comment:
   nit: What does this comment mean? can we remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to