mutianf commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r875029147
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +212,223 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+
+ private @Nullable ReadRowsRequest nextRequest;
+ private @Nullable Row currentRow;
+ private final Queue<Row> buffer;
+ private final int refillSegmentWaterMark;
+ private final long maxSegmentByteSize;
+ private Future<UpstreamResults> future;
+ private ServiceCallMetric serviceCallMetric;
+
+ private static class UpstreamResults {
+ private final List<Row> rows;
+ private final @Nullable ReadRowsRequest nextRequest;
+
+ private UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest
nextRequest) {
+ this.rows = rows;
+ this.nextRequest = nextRequest;
+ }
+ }
+
+ static BigtableSegmentReaderImpl create(BigtableSession session,
BigtableSource source) {
+ RowSet.Builder rowSetBuilder = RowSet.newBuilder();
+ if (source.getRanges().isEmpty()) {
+ rowSetBuilder =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance());
+ } else {
+ // BigtableSource only contains ranges with a closed start key and
open end key
+ for (ByteKeyRange beamRange : source.getRanges()) {
+ RowRange.Builder rangeBuilder = rowSetBuilder.addRowRangesBuilder();
+ rangeBuilder
+
.setStartKeyClosed(ByteString.copyFrom(beamRange.getStartKey().getValue()))
+
.setEndKeyOpen(ByteString.copyFrom(beamRange.getEndKey().getValue()));
+ }
+ }
+ RowSet rowSet = rowSetBuilder.build();
+ RowFilter filter =
+ MoreObjects.firstNonNull(source.getRowFilter(),
RowFilter.getDefaultInstance());
+
+ long maxSegmentByteSize =
+ (long)
+ Math.max(
+ MIN_BYTE_BUFFER_SIZE,
+ (Runtime.getRuntime().totalMemory() *
DEFAULT_BYTE_LIMIT_PERCENTAGE));
+
+ return new BigtableSegmentReaderImpl(
+ session,
+
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()),
+ rowSet,
+ source.getMaxBufferElementCount(),
+ maxSegmentByteSize,
+ filter,
+ populateReaderCallMetric(session, source.getTableId().get()));
+ }
+
+ @VisibleForTesting
+ BigtableSegmentReaderImpl(
+ BigtableSession session,
+ String tableName,
+ RowSet rowSet,
+ int maxRowsInBuffer,
+ long maxSegmentByteSize,
+ RowFilter filter,
+ ServiceCallMetric serviceCallMetric) {
+ if (rowSet.equals(rowSet.getDefaultInstanceForType())) {
+ rowSet =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+ }
+ ReadRowsRequest request =
+ ReadRowsRequest.newBuilder()
+ .setTableName(tableName)
+ .setRows(rowSet)
+ .setFilter(filter)
+ .setRowsLimit(maxRowsInBuffer)
+ .build();
+
+ this.session = session;
+ this.nextRequest = request;
+ this.maxSegmentByteSize = maxSegmentByteSize;
+ this.serviceCallMetric = serviceCallMetric;
+ this.buffer = new ArrayDeque<>();
+ // Asynchronously refill buffer when there is 10% of the elements are
left
+ this.refillSegmentWaterMark = (int) (request.getRowsLimit() *
WATERMARK_PERCENTAGE);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ future = fetchNextSegment();
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.size() < refillSegmentWaterMark && future == null) {
+ future = fetchNextSegment();
+ }
+ if (buffer.isEmpty() && future != null) {
+ waitReadRowsFuture();
+ }
+ currentRow = buffer.poll();
+ return currentRow != null;
+ }
+
+ private Future<UpstreamResults> fetchNextSegment() {
+ SettableFuture<UpstreamResults> f = SettableFuture.create();
+ // When the nextRequest is null, the last fill completed and the buffer
contains the last rows
+ if (nextRequest == null) {
+ f.set(new UpstreamResults(ImmutableList.of(), null));
+ return f;
+ }
+
+ // TODO(diegomez): Remove atomic ScanHandler for simpler
StreamObserver/Future implementation
+ AtomicReference<ScanHandler> atomicScanHandler = new AtomicReference<>();
+ ScanHandler handler =
+ session
+ .getDataClient()
+ .readFlatRows(
+ nextRequest,
+ new StreamObserver<FlatRow>() {
+ List<Row> rows = new ArrayList<>();
+ long currentByteSize = 0;
+ boolean byteLimitReached = false;
+
+ @Override
+ public void onNext(FlatRow flatRow) {
+ Row row = FlatRowConverter.convert(flatRow);
+ currentByteSize += row.getSerializedSize();
+ rows.add(row);
+
+ if (currentByteSize > maxSegmentByteSize) {
+ byteLimitReached = true;
+ atomicScanHandler.get().cancel();
+ return;
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ f.setException(e);
+ }
+
+ @Override
+ public void onCompleted() {
+ ReadRowsRequest nextNextRequest = null;
+
+ // When requested rows < limit, the current request will
be the last
+ if (byteLimitReached || rows.size() ==
nextRequest.getRowsLimit()) {
+ nextNextRequest =
+ truncateRequest(nextRequest, rows.get(rows.size()
- 1).getKey());
+ }
+ f.set(new UpstreamResults(rows, nextNextRequest));
+ }
+ });
+ atomicScanHandler.set(handler);
+ return f;
+ }
+
+ private void waitReadRowsFuture() throws IOException {
+ try {
+ UpstreamResults r = future.get();
+ buffer.addAll(r.rows);
+ nextRequest = r.nextRequest;
+ future = null;
+ serviceCallMetric.call("ok");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof StatusRuntimeException) {
+ serviceCallMetric.call(((StatusRuntimeException)
cause).getStatus().getCode().value());
Review Comment:
I think .getStatus().getCode().value() returns an integer, but line 377
`serviceCallMetric.call("ok");` you're passing a string. Should line 377 be
`serviceCallMetric.call(0);` instead?
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +158,424 @@ public void testRead() throws IOException {
verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
}
+ /**
+ * This test ensures that protobuf creation and interactions with {@link
BigtableDataClient} work
+ * as expected. This test checks that a single row is returned from the
future.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+ RowSet.Builder ranges = RowSet.newBuilder();
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0),
generateByteString(DEFAULT_PREFIX, 1)));
+
+ FlatRow expectedRow =
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ ranges.build(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ underTest.start();
+ Assert.assertEquals(FlatRowConverter.convert(expectedRow),
underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ Mockito.verify(mockCallMetric, Mockito.times(2)).call("ok");
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses a single range with SEGMENT_SIZE*2+1 rows. Range: [b00000, b00001,
... b00199, b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+ RowSet.Builder ranges = RowSet.newBuilder();
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ ranges.build(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+ Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses two ranges with SEGMENT_SIZE rows. The buffer should be refilled
twice and ReadRowsAsync
+ * should be called three times. The last rpc call should return zero rows.
The following test
+ * follows this example: FirstRange: [b00000,b00001,...,b00099,b00100)
SecondRange:
+ * [b00100,b00101,...,b00199,b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRanges() throws IOException {
+ RowSet.Builder ranges = RowSet.newBuilder();
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ ranges.build(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses three overlapping ranges. The logic should remove all keys that were
already added to the
+ * buffer. The following test follows this example: FirstRange:
[b00000,b00001,...,b00099,b00100)
+ * SecondRange: [b00050,b00051...b00100,b00101,...,b00199,b00200)
ThirdRange: [b00070, b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRangesOverlappingKeys() throws IOException {
+ RowSet.Builder ranges = RowSet.newBuilder();
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, (int) (SEGMENT_SIZE * .7)),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ ranges.build(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ BigtableServiceImpl.populateReaderCallMetric(mockSession,
TABLE_ID));
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses an empty request to trigger a full table scan. RowRange: [b00000,
b00001, ... b00300)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFullTableScan() throws IOException {
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE * 2,
SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ RowSet.getDefaultInstance(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ Mockito.verify(mockCallMetric, Mockito.times(4)).call("ok");;
+ }
+
+ /**
+ * This test compares the amount of RowRanges being requested after the
buffer is refilled. After
+ * reading the first buffer, the first two RowRanges should be removed and
the RowRange containing
+ * [b00100,b00200) should be requested.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFillBuffer() throws IOException {
+ RowSet.Builder ranges = RowSet.newBuilder();
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2)));
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE / 2),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE)));
+ ranges.addRowRanges(
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE * 2)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ ranges.build(),
+ SEGMENT_SIZE,
+ DEFAULT_BYTE_SEGMENT_SIZE,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ verify(mockBigtableDataClient, times(1))
+ .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+ Assert.assertEquals(3,
requestCaptor.getValue().getRows().getRowRangesCount());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ verify(mockBigtableDataClient, times(3))
+ .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+ Assert.assertEquals(1,
requestCaptor.getValue().getRows().getRowRangesCount());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+ }
+
+ /**
+ * This test checks that the buffer will stop filling up once the byte limit
is reached. It will
+ * cancel the ScanHandler after reached the limit. This test completes one
fill and contains one
+ * Row after the first buffer has been completed. The test cheaks the
current available memory in
+ * the JVM and uses a percent of it to mock the original behavior. Range:
[b00000, b00010)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadByteLimitBuffer() throws IOException {
+ long segmentByteLimit = DEFAULT_ROW_SIZE * (SEGMENT_SIZE / 2);
+ int numOfRowsInsideBuffer = (int) (segmentByteLimit / DEFAULT_ROW_SIZE) +
1;
+
+ RowRange mockRowRange =
+ generateRowRange(
+ generateByteString(DEFAULT_PREFIX, 0),
+ generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateLargeSegmentResult(DEFAULT_PREFIX, 0,
numOfRowsInsideBuffer),
+ generateSegmentResult(
+ DEFAULT_PREFIX, numOfRowsInsideBuffer, SEGMENT_SIZE -
numOfRowsInsideBuffer),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(
+ mockSession,
+ TABLE_ID,
+ RowSet.newBuilder().addRowRanges(mockRowRange).build(),
+ SEGMENT_SIZE,
+ segmentByteLimit,
+ RowFilter.getDefaultInstance(),
+ mockCallMetric);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ Mockito.verify(mockCallMetric, Mockito.times(3)).call("ok");
+ }
+
+ /**
+ * This test ensures the Exception handling inside of the scanHandler. This
test will check if a
+ * StatusRuntimeException was thrown.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSegmentExceptionHandling() throws IOException {
+ // mockCallMetric must be fixed
Review Comment:
nit: What does this comment mean? can we remove it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]