igorbernstein2 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r873890850
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +153,370 @@ public void testRead() throws IOException {
verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
}
+ /**
+ * This test ensures that protobuf creation and interactions with {@link
BigtableDataClient} work
+ * as expected. This test checks that a single row is returned from the
future.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+ ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+ FlatRow expectedRow =
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(FlatRowConverter.convert(expectedRow),
underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses a single range with SEGMENT_SIZE*2+1 rows. Range: [b00000, b00001,
... b00199, b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+ ByteKey start = generateByteKey(DEFAULT_PREFIX, 0);
+ ByteKey end = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE * 2);
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses two ranges with SEGMENT_SIZE rows. The buffer should be refilled
twice and ReadRowsAsync
+ * should be called three times. The last rpc call should return zero rows.
The following test
+ * follows this example: FirstRange: [b00000,b00001,...,b00099,b00100)
SecondRange:
+ * [b00100,b00101,...,b00199,b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRanges() throws IOException {
+ ByteKey start = generateByteKey(DEFAULT_PREFIX, 0);
+ ByteKey middleKey = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE);
+ ByteKey end = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE * 2);
+ when(mockBigtableSource.getRanges())
+ .thenReturn(
+ Arrays.asList(ByteKeyRange.of(start, middleKey),
ByteKeyRange.of(middleKey, end)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses three overlapping ranges. The logic should remove all keys that were
already added to the
+ * buffer. The following test follows this example: FirstRange:
[b00000,b00001,...,b00099,b00100)
+ * SecondRange: [b00050,b00051...b00100,b00101,...,b00199,b00200)
ThirdRange: [b00070, b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRangesOverlappingKeys() throws IOException {
+ ByteKey firstStart = generateByteKey(DEFAULT_PREFIX, 0);
+ ByteKey firstEnd = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE);
+
+ ByteKey secondStart = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE / 2);
+ ByteKey secondEnd = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE * 2);
+
+ ByteKey thirdStart = generateByteKey(DEFAULT_PREFIX, (int) (SEGMENT_SIZE *
.7));
+ when(mockBigtableSource.getRanges())
+ .thenReturn(
+ Arrays.asList(
+ ByteKeyRange.of(firstStart, firstEnd),
+ ByteKeyRange.of(secondStart, secondEnd),
+ ByteKeyRange.of(thirdStart, secondEnd)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses an empty request to trigger a full table scan. RowRange: [b00000,
b00001, ... b00300)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFullTableScan() throws IOException {
+ when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList());
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE * 2,
SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 4);
+ }
+
+ /**
+ * This test compares the amount of RowRanges being requested after the
buffer is refilled. After
+ * reading the first buffer, the first two RowRanges should be removed and
the RowRange containing
+ * [b00100,b00200) should be requested.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFillBuffer() throws IOException {
+ ByteKey firstStart = generateByteKey(DEFAULT_PREFIX, 0);
+ ByteKey firstEnd = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE / 2);
+
+ ByteKey secondStart = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE / 2);
+ ByteKey secondEnd = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE);
+
+ ByteKey thirdStart = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE);
+ ByteKey thirdEnd = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE * 2);
+
+ when(mockBigtableSource.getRanges())
+ .thenReturn(
+ Arrays.asList(
+ ByteKeyRange.of(firstStart, firstEnd),
+ ByteKeyRange.of(secondStart, secondEnd),
+ ByteKeyRange.of(thirdStart, thirdEnd)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ verify(mockBigtableDataClient, times(1))
+ .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+ Assert.assertEquals(3,
requestCaptor.getValue().getRows().getRowRangesCount());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ verify(mockBigtableDataClient, times(3))
+ .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+ Assert.assertEquals(1,
requestCaptor.getValue().getRows().getRowRangesCount());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test checks that the buffer will stop filling up once the byte limit
is reached. It will
+ * cancel the ScanHandler after reached the limit. This test completes one
fill and contains one
+ * Row after the first buffer has been completed. The test cheaks the
current available memory in
+ * the JVM and uses a percent of it to mock the original behavior. Range:
[b00000, b00010)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadByteLimitBuffer() throws IOException {
+ long segmentByteLimit = DEFAULT_ROW_SIZE * (SEGMENT_SIZE / 2);
+ int numOfRowsInsideBuffer = (int) (segmentByteLimit / DEFAULT_ROW_SIZE) +
1;
+
+ ByteKey start = generateByteKey(DEFAULT_PREFIX, 0);
+ ByteKey end = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE);
+
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+
+ RowRange mockRowRange =
+ RowRange.newBuilder()
+ .setStartKeyClosed(generateByteString(DEFAULT_PREFIX, 0))
+ .setEndKeyOpen(generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE))
+ .build();
+
+ ReadRowsRequest mockRequest =
+ ReadRowsRequest.newBuilder()
+ .setTableName(TABLE_ID)
+ .setRows(RowSet.newBuilder().addRowRanges(mockRowRange).build())
+ .setRowsLimit(SEGMENT_SIZE)
+ .build();
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateLargeSegmentResult(DEFAULT_PREFIX, 0,
numOfRowsInsideBuffer),
+ generateSegmentResult(
+ DEFAULT_PREFIX, numOfRowsInsideBuffer, SEGMENT_SIZE -
numOfRowsInsideBuffer),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ // Max amount of memory allowed in a Row (256MB)
+ // This test uses a limit of 1000MB which should fill 5 rows per fill (1 +
1000/256 = 5)
Review Comment:
I really dont think it necessary for a unit test to require 1GiB
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +153,370 @@ public void testRead() throws IOException {
verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
}
+ /**
+ * This test ensures that protobuf creation and interactions with {@link
BigtableDataClient} work
+ * as expected. This test checks that a single row is returned from the
future.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+ ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+ FlatRow expectedRow =
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(FlatRowConverter.convert(expectedRow),
underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses a single range with SEGMENT_SIZE*2+1 rows. Range: [b00000, b00001,
... b00199, b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+ ByteKey start = generateByteKey(DEFAULT_PREFIX, 0);
+ ByteKey end = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE * 2);
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses two ranges with SEGMENT_SIZE rows. The buffer should be refilled
twice and ReadRowsAsync
+ * should be called three times. The last rpc call should return zero rows.
The following test
+ * follows this example: FirstRange: [b00000,b00001,...,b00099,b00100)
SecondRange:
+ * [b00100,b00101,...,b00199,b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRanges() throws IOException {
+ ByteKey start = generateByteKey(DEFAULT_PREFIX, 0);
+ ByteKey middleKey = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE);
+ ByteKey end = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE * 2);
+ when(mockBigtableSource.getRanges())
+ .thenReturn(
+ Arrays.asList(ByteKeyRange.of(start, middleKey),
ByteKeyRange.of(middleKey, end)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses three overlapping ranges. The logic should remove all keys that were
already added to the
+ * buffer. The following test follows this example: FirstRange:
[b00000,b00001,...,b00099,b00100)
+ * SecondRange: [b00050,b00051...b00100,b00101,...,b00199,b00200)
ThirdRange: [b00070, b00200)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRangesOverlappingKeys() throws IOException {
+ ByteKey firstStart = generateByteKey(DEFAULT_PREFIX, 0);
+ ByteKey firstEnd = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE);
+
+ ByteKey secondStart = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE / 2);
+ ByteKey secondEnd = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE * 2);
+
+ ByteKey thirdStart = generateByteKey(DEFAULT_PREFIX, (int) (SEGMENT_SIZE *
.7));
+ when(mockBigtableSource.getRanges())
+ .thenReturn(
+ Arrays.asList(
+ ByteKeyRange.of(firstStart, firstEnd),
+ ByteKeyRange.of(secondStart, secondEnd),
+ ByteKeyRange.of(thirdStart, secondEnd)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses an empty request to trigger a full table scan. RowRange: [b00000,
b00001, ... b00300)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFullTableScan() throws IOException {
+ when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList());
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE * 2,
SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 4);
+ }
+
+ /**
+ * This test compares the amount of RowRanges being requested after the
buffer is refilled. After
+ * reading the first buffer, the first two RowRanges should be removed and
the RowRange containing
+ * [b00100,b00200) should be requested.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFillBuffer() throws IOException {
+ ByteKey firstStart = generateByteKey(DEFAULT_PREFIX, 0);
+ ByteKey firstEnd = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE / 2);
+
+ ByteKey secondStart = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE / 2);
+ ByteKey secondEnd = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE);
+
+ ByteKey thirdStart = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE);
+ ByteKey thirdEnd = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE * 2);
+
+ when(mockBigtableSource.getRanges())
+ .thenReturn(
+ Arrays.asList(
+ ByteKeyRange.of(firstStart, firstEnd),
+ ByteKeyRange.of(secondStart, secondEnd),
+ ByteKeyRange.of(thirdStart, thirdEnd)));
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateSegmentResult(DEFAULT_PREFIX, 0, SEGMENT_SIZE),
+ generateSegmentResult(DEFAULT_PREFIX, SEGMENT_SIZE, SEGMENT_SIZE),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ BigtableService.Reader underTest =
+ BigtableServiceImpl.BigtableSegmentReaderImpl.create(mockSession,
mockBigtableSource);
+
+ List<Row> actualResults = new ArrayList<>();
+ Assert.assertTrue(underTest.start());
+ verify(mockBigtableDataClient, times(1))
+ .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+ Assert.assertEquals(3,
requestCaptor.getValue().getRows().getRowRangesCount());
+ do {
+ actualResults.add(underTest.getCurrentRow());
+ } while (underTest.advance());
+
+ verify(mockBigtableDataClient, times(3))
+ .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+ Assert.assertEquals(1,
requestCaptor.getValue().getRows().getRowRangesCount());
+
+ Assert.assertEquals(
+ expectedResults.stream()
+ .flatMap(Collection::stream)
+ .map(i -> FlatRowConverter.convert(i))
+ .collect(Collectors.toList()),
+ actualResults);
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test checks that the buffer will stop filling up once the byte limit
is reached. It will
+ * cancel the ScanHandler after reached the limit. This test completes one
fill and contains one
+ * Row after the first buffer has been completed. The test cheaks the
current available memory in
+ * the JVM and uses a percent of it to mock the original behavior. Range:
[b00000, b00010)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadByteLimitBuffer() throws IOException {
+ long segmentByteLimit = DEFAULT_ROW_SIZE * (SEGMENT_SIZE / 2);
+ int numOfRowsInsideBuffer = (int) (segmentByteLimit / DEFAULT_ROW_SIZE) +
1;
+
+ ByteKey start = generateByteKey(DEFAULT_PREFIX, 0);
+ ByteKey end = generateByteKey(DEFAULT_PREFIX, SEGMENT_SIZE);
+
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+
+ RowRange mockRowRange =
+ RowRange.newBuilder()
+ .setStartKeyClosed(generateByteString(DEFAULT_PREFIX, 0))
+ .setEndKeyOpen(generateByteString(DEFAULT_PREFIX, SEGMENT_SIZE))
+ .build();
+
+ ReadRowsRequest mockRequest =
+ ReadRowsRequest.newBuilder()
+ .setTableName(TABLE_ID)
+ .setRows(RowSet.newBuilder().addRowRanges(mockRowRange).build())
+ .setRowsLimit(SEGMENT_SIZE)
+ .build();
+
+ OngoingStubbing<?> stub =
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()));
+ List<List<FlatRow>> expectedResults =
+ ImmutableList.of(
+ generateLargeSegmentResult(DEFAULT_PREFIX, 0,
numOfRowsInsideBuffer),
+ generateSegmentResult(
+ DEFAULT_PREFIX, numOfRowsInsideBuffer, SEGMENT_SIZE -
numOfRowsInsideBuffer),
+ ImmutableList.of());
+ expectRowResults(stub, expectedResults);
+
+ // Max amount of memory allowed in a Row (256MB)
+ // This test uses a limit of 1000MB which should fill 5 rows per fill (1 +
1000/256 = 5)
Review Comment:
I really dont think it necessary for a unit test to require 1GiB of memory
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]