igorbernstein2 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r865419411
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +144,451 @@ public void testRead() throws IOException {
verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
}
+ /**
+ * This test ensures that protobuf creation and interactions with {@link
BigtableDataClient} work
+ * as expected. This test checks that a single row is returned from the
future.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+ ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+ FlatRow expectedRow =
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(FlatRowConverter.convert(expectedRow),
underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses a single range with MINI_BATCH_ROW_LIMIT*2+1 rows. Range: [a, b1,
... b199, c)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+ List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+ List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
Review Comment:
I think you can extract a couple of helpers that would make these tests less
repetitive:
```java
List<FlatRow> generateSegmentResult(String prefix, int startIndex, int
count) {
return IntStrea.range(startIndex, startIndex + count)
.mapToObj(i-> FlatRow.newBuilder().withRowKey(... prefix +
String.format("%05d", i))
.collect(Collectors.toList())
}
<T> OngoingStub<T> expectRowResults(OngoingStub<T> stub,
List<List<FlatRow>>> results) {
for(List<FlatRow> result : results) {
stub = stub.answer(mockReadRowsAnswer(result));
}
return stub;
}
```
Then you can use it as follows:
```java
List<List<FlatRow>> expectedResults = ImmutableList.of(
generateSegmentResult("b", 0, MINI_BATCH_ROW_LIMIT),
generateSegmentResult("b", MINI_BATCH_ROW_LIMIT, MINI_BATCH_ROW_LIMIT),
ImmutableList.of()
)
OngoingStub<?> stub =
when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class), any()));
expectRowResults(stub, expectedResults);
```
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +144,451 @@ public void testRead() throws IOException {
verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
}
+ /**
+ * This test ensures that protobuf creation and interactions with {@link
BigtableDataClient} work
+ * as expected. This test checks that a single row is returned from the
future.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+ ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+ FlatRow expectedRow =
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(FlatRowConverter.convert(expectedRow),
underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses a single range with MINI_BATCH_ROW_LIMIT*2+1 rows. Range: [a, b1,
... b199, c)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+ List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+ List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ expectedFirstRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" + String.format("%05d",
i)))
+ .build());
+ expectedSecondRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(
+ ByteString.copyFromUtf8("b" + String.format("%05d", i +
MINI_BATCH_ROW_LIMIT)))
+ .build());
+ }
+ expectedFirstRangeRows.set(
+ 0,
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+ ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey end = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(expectedFirstRangeRows))
+ .thenAnswer(mockReadRowsAnswer(expectedSecondRangeRows))
+ .thenAnswer(mockReadRowsAnswer(new ArrayList<FlatRow>()));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(
+ FlatRowConverter.convert(expectedFirstRangeRows.get(0)),
underTest.getCurrentRow());
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT - 1; i++) {
+ underTest.advance();
+ }
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ underTest.advance();
+ }
+ Assert.assertEquals(
+
FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size()
- 1)),
+ underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
Review Comment:
Consider checking all of the results:
```java
List<FlatRow> actualResults = new ArrayList();
Assert.assertTrue(underTest.start());
do {
actualResults.add(underTest.getCurrentRow());
} while(underTest.advance());
Assert.assertEquals(
expectedResults.stream().flatMap(Collection::stream).collect(Collectors.toList()),
actualResults);
```
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +144,451 @@ public void testRead() throws IOException {
verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
}
+ /**
+ * This test ensures that protobuf creation and interactions with {@link
BigtableDataClient} work
+ * as expected. This test checks that a single row is returned from the
future.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+ ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+ FlatRow expectedRow =
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(FlatRowConverter.convert(expectedRow),
underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses a single range with MINI_BATCH_ROW_LIMIT*2+1 rows. Range: [a, b1,
... b199, c)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+ List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+ List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ expectedFirstRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" + String.format("%05d",
i)))
+ .build());
+ expectedSecondRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(
+ ByteString.copyFromUtf8("b" + String.format("%05d", i +
MINI_BATCH_ROW_LIMIT)))
+ .build());
+ }
+ expectedFirstRangeRows.set(
+ 0,
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+ ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey end = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(expectedFirstRangeRows))
+ .thenAnswer(mockReadRowsAnswer(expectedSecondRangeRows))
+ .thenAnswer(mockReadRowsAnswer(new ArrayList<FlatRow>()));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(
+ FlatRowConverter.convert(expectedFirstRangeRows.get(0)),
underTest.getCurrentRow());
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT - 1; i++) {
+ underTest.advance();
+ }
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ underTest.advance();
+ }
+ Assert.assertEquals(
+
FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size()
- 1)),
+ underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+
+ underTest.close();
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses two ranges with MINI_BATCH_ROW_LIMIT rows. The buffer should be
refilled twice and
+ * ReadRowsAsync should be called three times. The last rpc call should
return zero rows.
+ * The following test follows this example: FirstRange: [a,b1,...,b99,c)
+ * SecondRange: [c,d1,...,d99,e)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRanges() throws IOException {
+ List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+ expectedFirstRangeRows.add(
+ FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+ List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+ expectedSecondRangeRows.add(
+ FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("c")).build());
+
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT - 1; i++) {
+ expectedFirstRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" + String.format("%05d",
i)))
+ .build());
+ expectedSecondRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("d" + String.format("%05d",
i)))
+ .build());
+ }
+
+ ByteKey firstStart =
ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey sharedKeyEnd =
ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+ ByteKey secondEnd = ByteKey.copyFrom("e".getBytes(StandardCharsets.UTF_8));
+
+ when(mockBigtableSource.getRanges())
+ .thenReturn(
+ Arrays.asList(
+ ByteKeyRange.of(firstStart, sharedKeyEnd),
+ ByteKeyRange.of(sharedKeyEnd, secondEnd)));
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(expectedFirstRangeRows))
+ .thenAnswer(mockReadRowsAnswer(expectedSecondRangeRows))
+ .thenAnswer(mockReadRowsAnswer(new ArrayList<FlatRow>()));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(
+ FlatRowConverter.convert(expectedFirstRangeRows.get(0)),
underTest.getCurrentRow());
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT - 1; i++) {
+ underTest.advance();
+ }
+ Assert.assertEquals(
+
FlatRowConverter.convert(expectedFirstRangeRows.get(expectedFirstRangeRows.size()
- 1)),
+ underTest.getCurrentRow());
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ underTest.advance();
+ }
+
+ Assert.assertEquals(
+
FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size()
- 1)),
+ underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses three overlapping ranges. The logic should remove all keys that were
already added to
+ * the buffer. The following test follows this example:
+ * FirstRange: [a,b1,...,b99,b100) SecondRange:
[b50,b51...b100,d1,...,d199,c)
+ * ThirdRange: [b70, c)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRangesOverlappingKeys() throws IOException {
+ List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+ expectedFirstRangeRows.add(
+ FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+ List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+ expectedSecondRangeRows.add(
+ FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("c")).build());
+
+ for (int i = 1; i < 2 * MINI_BATCH_ROW_LIMIT; i++) {
+ if (i < MINI_BATCH_ROW_LIMIT) {
+ expectedFirstRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ }
+ if (i > MINI_BATCH_ROW_LIMIT / 2) {
+ expectedSecondRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ }
+ }
+
+ ByteKey firstStart =
ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey firstEnd =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d",
(MINI_BATCH_ROW_LIMIT))).getBytes(StandardCharsets.UTF_8));
+
+ ByteKey secondStart =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d", (MINI_BATCH_ROW_LIMIT / 2)))
+ .getBytes(StandardCharsets.UTF_8));
+ ByteKey secondEnd = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+
+ ByteKey thirdStart =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d", (MINI_BATCH_ROW_LIMIT / 2 + 20)))
+ .getBytes(StandardCharsets.UTF_8));
+
+ when(mockBigtableSource.getRanges())
+ .thenReturn(
+ Arrays.asList(
+ ByteKeyRange.of(firstStart, firstEnd),
+ ByteKeyRange.of(secondStart, secondEnd),
+ ByteKeyRange.of(thirdStart, secondEnd)));
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(expectedFirstRangeRows))
+ .thenAnswer(
+ mockReadRowsAnswer(
+ expectedSecondRangeRows.subList(
+ MINI_BATCH_ROW_LIMIT / 2, MINI_BATCH_ROW_LIMIT +
MINI_BATCH_ROW_LIMIT / 2)))
+ .thenAnswer(mockReadRowsAnswer(new ArrayList<FlatRow>()));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(
+ FlatRowConverter.convert(expectedFirstRangeRows.get(0)),
underTest.getCurrentRow());
+ for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+ underTest.advance();
+ }
+ Assert.assertEquals(
+
FlatRowConverter.convert(expectedFirstRangeRows.get(expectedFirstRangeRows.size()
- 1)),
+ underTest.getCurrentRow());
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ underTest.advance();
+ }
+ Assert.assertEquals(
+
FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size()
- 1)),
+ underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses an empty request to trigger a full table scan. RowRange: [b0, b1,
... b299)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFullTableScan() throws IOException {
+ List<FlatRow> expectedRangeRows = new ArrayList<>();
+
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT * 3; i++) {
+ if (i < MINI_BATCH_ROW_LIMIT) {
+ expectedRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ } else if (i < MINI_BATCH_ROW_LIMIT * 2) {
+ expectedRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ } else {
+ expectedRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ }
+ }
+
+ when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList());
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(expectedRangeRows.subList(0,
MINI_BATCH_ROW_LIMIT)))
+ .thenAnswer(
+ mockReadRowsAnswer(
+ expectedRangeRows.subList(MINI_BATCH_ROW_LIMIT,
MINI_BATCH_ROW_LIMIT * 2)))
+ .thenAnswer(
+ mockReadRowsAnswer(
+ expectedRangeRows.subList(MINI_BATCH_ROW_LIMIT * 2,
MINI_BATCH_ROW_LIMIT * 3)))
+ .thenAnswer(mockReadRowsAnswer(new ArrayList<FlatRow>()));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(
+ FlatRowConverter.convert(expectedRangeRows.get(0)),
underTest.getCurrentRow());
+ for (int i = 0; i < 3 * MINI_BATCH_ROW_LIMIT - 1; i++) {
+ underTest.advance();
+ }
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 4);
+ }
+
+ /**
+ * This test compares the amount of RowRanges being requested after the
buffer is refilled. After
+ * reading the first buffer, the first two RowRanges should be removed and
the RowRange containing
+ * c should be requested.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFillBuffer() throws IOException {
+ List<FlatRow> rowsInRowRanges = new ArrayList<>();
+
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT + MINI_BATCH_ROW_LIMIT / 2; i++) {
+ if (i < MINI_BATCH_ROW_LIMIT) {
+ rowsInRowRanges.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ } else {
+ rowsInRowRanges.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("c" +
String.format("%05d", i)))
+ .build());
+ }
+ }
+ rowsInRowRanges.set(0,
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+ ByteKey firstStart =
ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey firstEnd =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d", MINI_BATCH_ROW_LIMIT / 2))
+ .getBytes(StandardCharsets.UTF_8));
+
+ ByteKey secondStart =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d", MINI_BATCH_ROW_LIMIT / 2))
+ .getBytes(StandardCharsets.UTF_8));
+ ByteKey secondEnd =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d", (MINI_BATCH_ROW_LIMIT - 1)))
+ .getBytes(StandardCharsets.UTF_8));
+
+ ByteKey thirdStart =
ByteKey.copyFrom(("c00000").getBytes(StandardCharsets.UTF_8));
+ ByteKey thirdEnd =
+ ByteKey.copyFrom(
+ ("c" + String.format("%05d", (MINI_BATCH_ROW_LIMIT / 2 - 1)))
+ .getBytes(StandardCharsets.UTF_8));
Review Comment:
consider extracting a makeKey(prefix, int) -> ByteKey helper
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java:
##########
@@ -126,6 +144,451 @@ public void testRead() throws IOException {
verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
}
+ /**
+ * This test ensures that protobuf creation and interactions with {@link
BigtableDataClient} work
+ * as expected. This test checks that a single row is returned from the
future.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeBelowSegmentLimit() throws Exception {
+ ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey end = ByteKey.copyFrom("b".getBytes(StandardCharsets.UTF_8));
+
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+ FlatRow expectedRow =
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build();
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(Arrays.asList(expectedRow)));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(FlatRowConverter.convert(expectedRow),
underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses a single range with MINI_BATCH_ROW_LIMIT*2+1 rows. Range: [a, b1,
... b199, c)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadSingleRangeAboveSegmentLimit() throws IOException {
+ List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+ List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ expectedFirstRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" + String.format("%05d",
i)))
+ .build());
+ expectedSecondRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(
+ ByteString.copyFromUtf8("b" + String.format("%05d", i +
MINI_BATCH_ROW_LIMIT)))
+ .build());
+ }
+ expectedFirstRangeRows.set(
+ 0,
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+ ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey end = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+
+
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
end)));
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(expectedFirstRangeRows))
+ .thenAnswer(mockReadRowsAnswer(expectedSecondRangeRows))
+ .thenAnswer(mockReadRowsAnswer(new ArrayList<FlatRow>()));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(
+ FlatRowConverter.convert(expectedFirstRangeRows.get(0)),
underTest.getCurrentRow());
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT - 1; i++) {
+ underTest.advance();
+ }
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ underTest.advance();
+ }
+ Assert.assertEquals(
+
FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size()
- 1)),
+ underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+
+ underTest.close();
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses two ranges with MINI_BATCH_ROW_LIMIT rows. The buffer should be
refilled twice and
+ * ReadRowsAsync should be called three times. The last rpc call should
return zero rows.
+ * The following test follows this example: FirstRange: [a,b1,...,b99,c)
+ * SecondRange: [c,d1,...,d99,e)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRanges() throws IOException {
+ List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+ expectedFirstRangeRows.add(
+ FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+ List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+ expectedSecondRangeRows.add(
+ FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("c")).build());
+
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT - 1; i++) {
+ expectedFirstRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" + String.format("%05d",
i)))
+ .build());
+ expectedSecondRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("d" + String.format("%05d",
i)))
+ .build());
+ }
+
+ ByteKey firstStart =
ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey sharedKeyEnd =
ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+ ByteKey secondEnd = ByteKey.copyFrom("e".getBytes(StandardCharsets.UTF_8));
+
+ when(mockBigtableSource.getRanges())
+ .thenReturn(
+ Arrays.asList(
+ ByteKeyRange.of(firstStart, sharedKeyEnd),
+ ByteKeyRange.of(sharedKeyEnd, secondEnd)));
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(expectedFirstRangeRows))
+ .thenAnswer(mockReadRowsAnswer(expectedSecondRangeRows))
+ .thenAnswer(mockReadRowsAnswer(new ArrayList<FlatRow>()));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(
+ FlatRowConverter.convert(expectedFirstRangeRows.get(0)),
underTest.getCurrentRow());
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT - 1; i++) {
+ underTest.advance();
+ }
+ Assert.assertEquals(
+
FlatRowConverter.convert(expectedFirstRangeRows.get(expectedFirstRangeRows.size()
- 1)),
+ underTest.getCurrentRow());
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ underTest.advance();
+ }
+
+ Assert.assertEquals(
+
FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size()
- 1)),
+ underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses three overlapping ranges. The logic should remove all keys that were
already added to
+ * the buffer. The following test follows this example:
+ * FirstRange: [a,b1,...,b99,b100) SecondRange:
[b50,b51...b100,d1,...,d199,c)
+ * ThirdRange: [b70, c)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadMultipleRangesOverlappingKeys() throws IOException {
+ List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+ expectedFirstRangeRows.add(
+ FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+ List<FlatRow> expectedSecondRangeRows = new ArrayList<>();
+ expectedSecondRangeRows.add(
+ FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("c")).build());
+
+ for (int i = 1; i < 2 * MINI_BATCH_ROW_LIMIT; i++) {
+ if (i < MINI_BATCH_ROW_LIMIT) {
+ expectedFirstRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ }
+ if (i > MINI_BATCH_ROW_LIMIT / 2) {
+ expectedSecondRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ }
+ }
+
+ ByteKey firstStart =
ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey firstEnd =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d",
(MINI_BATCH_ROW_LIMIT))).getBytes(StandardCharsets.UTF_8));
+
+ ByteKey secondStart =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d", (MINI_BATCH_ROW_LIMIT / 2)))
+ .getBytes(StandardCharsets.UTF_8));
+ ByteKey secondEnd = ByteKey.copyFrom("c".getBytes(StandardCharsets.UTF_8));
+
+ ByteKey thirdStart =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d", (MINI_BATCH_ROW_LIMIT / 2 + 20)))
+ .getBytes(StandardCharsets.UTF_8));
+
+ when(mockBigtableSource.getRanges())
+ .thenReturn(
+ Arrays.asList(
+ ByteKeyRange.of(firstStart, firstEnd),
+ ByteKeyRange.of(secondStart, secondEnd),
+ ByteKeyRange.of(thirdStart, secondEnd)));
+
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(expectedFirstRangeRows))
+ .thenAnswer(
+ mockReadRowsAnswer(
+ expectedSecondRangeRows.subList(
+ MINI_BATCH_ROW_LIMIT / 2, MINI_BATCH_ROW_LIMIT +
MINI_BATCH_ROW_LIMIT / 2)))
+ .thenAnswer(mockReadRowsAnswer(new ArrayList<FlatRow>()));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(
+ FlatRowConverter.convert(expectedFirstRangeRows.get(0)),
underTest.getCurrentRow());
+ for (int i = 1; i < MINI_BATCH_ROW_LIMIT; i++) {
+ underTest.advance();
+ }
+ Assert.assertEquals(
+
FlatRowConverter.convert(expectedFirstRangeRows.get(expectedFirstRangeRows.size()
- 1)),
+ underTest.getCurrentRow());
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ underTest.advance();
+ }
+ Assert.assertEquals(
+
FlatRowConverter.convert(expectedSecondRangeRows.get(expectedSecondRangeRows.size()
- 1)),
+ underTest.getCurrentRow());
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 3);
+ }
+
+ /**
+ * This test ensures that all the rows are properly added to the buffer and
read. This example
+ * uses an empty request to trigger a full table scan. RowRange: [b0, b1,
... b299)
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFullTableScan() throws IOException {
+ List<FlatRow> expectedRangeRows = new ArrayList<>();
+
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT * 3; i++) {
+ if (i < MINI_BATCH_ROW_LIMIT) {
+ expectedRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ } else if (i < MINI_BATCH_ROW_LIMIT * 2) {
+ expectedRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ } else {
+ expectedRangeRows.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ }
+ }
+
+ when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList());
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(expectedRangeRows.subList(0,
MINI_BATCH_ROW_LIMIT)))
+ .thenAnswer(
+ mockReadRowsAnswer(
+ expectedRangeRows.subList(MINI_BATCH_ROW_LIMIT,
MINI_BATCH_ROW_LIMIT * 2)))
+ .thenAnswer(
+ mockReadRowsAnswer(
+ expectedRangeRows.subList(MINI_BATCH_ROW_LIMIT * 2,
MINI_BATCH_ROW_LIMIT * 3)))
+ .thenAnswer(mockReadRowsAnswer(new ArrayList<FlatRow>()));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ Assert.assertEquals(
+ FlatRowConverter.convert(expectedRangeRows.get(0)),
underTest.getCurrentRow());
+ for (int i = 0; i < 3 * MINI_BATCH_ROW_LIMIT - 1; i++) {
+ underTest.advance();
+ }
+ Assert.assertFalse(underTest.advance());
+ underTest.close();
+
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 4);
+ }
+
+ /**
+ * This test compares the amount of RowRanges being requested after the
buffer is refilled. After
+ * reading the first buffer, the first two RowRanges should be removed and
the RowRange containing
+ * c should be requested.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadFillBuffer() throws IOException {
+ List<FlatRow> rowsInRowRanges = new ArrayList<>();
+
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT + MINI_BATCH_ROW_LIMIT / 2; i++) {
+ if (i < MINI_BATCH_ROW_LIMIT) {
+ rowsInRowRanges.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("b" +
String.format("%05d", i)))
+ .build());
+ } else {
+ rowsInRowRanges.add(
+ FlatRow.newBuilder()
+ .withRowKey(ByteString.copyFromUtf8("c" +
String.format("%05d", i)))
+ .build());
+ }
+ }
+ rowsInRowRanges.set(0,
FlatRow.newBuilder().withRowKey(ByteString.copyFromUtf8("a")).build());
+
+ ByteKey firstStart =
ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
+ ByteKey firstEnd =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d", MINI_BATCH_ROW_LIMIT / 2))
+ .getBytes(StandardCharsets.UTF_8));
+
+ ByteKey secondStart =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d", MINI_BATCH_ROW_LIMIT / 2))
+ .getBytes(StandardCharsets.UTF_8));
+ ByteKey secondEnd =
+ ByteKey.copyFrom(
+ ("b" + String.format("%05d", (MINI_BATCH_ROW_LIMIT - 1)))
+ .getBytes(StandardCharsets.UTF_8));
+
+ ByteKey thirdStart =
ByteKey.copyFrom(("c00000").getBytes(StandardCharsets.UTF_8));
+ ByteKey thirdEnd =
+ ByteKey.copyFrom(
+ ("c" + String.format("%05d", (MINI_BATCH_ROW_LIMIT / 2 - 1)))
+ .getBytes(StandardCharsets.UTF_8));
+
+ when(mockBigtableSource.getRanges())
+ .thenReturn(
+ Arrays.asList(
+ ByteKeyRange.of(firstStart, firstEnd),
+ ByteKeyRange.of(secondStart, secondEnd),
+ ByteKeyRange.of(thirdStart, thirdEnd)));
+ when(mockBigtableDataClient.readFlatRows(any(ReadRowsRequest.class),
any()))
+ .thenAnswer(mockReadRowsAnswer(rowsInRowRanges.subList(0,
MINI_BATCH_ROW_LIMIT)))
+ .thenAnswer(
+ mockReadRowsAnswer(
+ rowsInRowRanges.subList(
+ MINI_BATCH_ROW_LIMIT, MINI_BATCH_ROW_LIMIT +
MINI_BATCH_ROW_LIMIT / 2)))
+ .thenAnswer(mockReadRowsAnswer(new ArrayList<FlatRow>()));
+
+ BigtableService.Reader underTest =
+ new BigtableServiceImpl.BigtableSegmentReaderImpl(mockSession,
mockBigtableSource);
+
+ underTest.start();
+ verify(mockBigtableDataClient, times(1))
+ .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+ Assert.assertEquals(3,
requestCaptor.getValue().getRows().getRowRangesCount());
+ Assert.assertEquals(
+ FlatRowConverter.convert(rowsInRowRanges.get(0)),
underTest.getCurrentRow());
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ underTest.advance();
+ }
+ verify(mockBigtableDataClient, times(2))
+ .readFlatRows(requestCaptor.capture(), any(StreamObserver.class));
+ for (int i = 0; i < MINI_BATCH_ROW_LIMIT; i++) {
+ underTest.advance();
+ }
+ Assert.assertEquals(
+ FlatRowConverter.convert(rowsInRowRanges.get(rowsInRowRanges.size() -
1)),
+ underTest.getCurrentRow());
+ Assert.assertEquals(1,
requestCaptor.getValue().getRows().getRowRangesCount());
+ Assert.assertFalse(underTest.advance());
+
+ underTest.close();
+ verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 2);
+ }
+
+ /**
+ * This test checks that the buffer will stop filling up once the byte limit
is reached. It will
+ * cancel the ScanHandler after reached the limit. This test completes one
fill and contains one
+ * Row after the first buffer has been completed. The test cheaks the
current available memory in
+ * the JVM and uses a percent of it to mock the original behavior.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testReadByteLimitBuffer() throws IOException {
+ List<FlatRow> expectedFirstRangeRows = new ArrayList<>();
+ // Max amount of memory allowed in a Row (256MB)
+ byte[] largeMemory = new byte[(int) DEFAULT_ROW_SIZE];
Review Comment:
Make maxSegmentByteSize a parameter on the read constructor so that you can
pass it in in tests. Then add a factory method to get it from the jvm:
```java
class Reader {
static Reader create(BigableSession session, BigtableSource source) {
...
int maxSegmentByteSize = Runtime.getRuntime().getTotalMemory() * .1;
return new Reader(..., maxSegmentByteSize);
}
}
```
Then in the pipeline you replace new Reader(...) with Reader.create(..).
And in your test you can instantiate it with a much lower limit new new
Reader(..., 10);
This will also make your test deterministic remove all of the noise below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]