Repository: incubator-beam Updated Branches: refs/heads/master 984b32ff2 -> 62c56c99b
[BEAM-639] BigtableIO: add support for users to scan table subranges Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dace48c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dace48c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dace48c7 Branch: refs/heads/master Commit: dace48c70d2d00500514c53734e9dd45dcb1465f Parents: 984b32f Author: Dan Halperin <dhalp...@google.com> Authored: Mon Sep 19 11:54:07 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 19 15:05:17 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 54 ++++++++--- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 95 ++++++++++++++++---- 2 files changed, 122 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dace48c7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 67dde50..c1b882a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -76,8 +76,9 @@ import org.slf4j.LoggerFactory; * * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions} * or builder configured with the project and other information necessary to identify the - * Bigtable instance. A {@link RowFilter} may also optionally be specified using - * {@link BigtableIO.Read#withRowFilter}. For example: + * Bigtable instance. By default, {@link BigtableIO.Read} will read all rows in the table. The row + * range to be read can optionally be restricted using {@link BigtableIO.Read#withKeyRange}, and + * a {@link RowFilter} can be specified using {@link BigtableIO.Read#withRowFilter}. For example: * * <pre>{@code * BigtableOptions.Builder optionsBuilder = @@ -93,6 +94,14 @@ import org.slf4j.LoggerFactory; * .withBigtableOptions(optionsBuilder) * .withTableId("table")); * + * // Scan a prefix of the table. + * ByteKeyRange keyRange = ...; + * p.apply("read", + * BigtableIO.read() + * .withBigtableOptions(optionsBuilder) + * .withTableId("table") + * .withKeyRange(keyRange)); + * * // Scan a subset of rows that match the specified row filter. * p.apply("filtered read", * BigtableIO.read() @@ -152,7 +161,7 @@ public class BigtableIO { */ @Experimental public static Read read() { - return new Read(null, "", null, null); + return new Read(null, "", ByteKeyRange.ALL_KEYS, null, null); } /** @@ -215,7 +224,7 @@ public class BigtableIO { .build()); BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build(); - return new Read(optionsWithAgent, tableId, filter, bigtableService); + return new Read(optionsWithAgent, tableId, keyRange, filter, bigtableService); } /** @@ -226,7 +235,17 @@ public class BigtableIO { */ public Read withRowFilter(RowFilter filter) { checkNotNull(filter, "filter"); - return new Read(options, tableId, filter, bigtableService); + return new Read(options, tableId, keyRange, filter, bigtableService); + } + + /** + * Returns a new {@link BigtableIO.Read} that will read only rows in the specified range. + * + * <p>Does not modify this object. + */ + public Read withKeyRange(ByteKeyRange keyRange) { + checkNotNull(keyRange, "keyRange"); + return new Read(options, tableId, keyRange, filter, bigtableService); } /** @@ -236,7 +255,7 @@ public class BigtableIO { */ public Read withTableId(String tableId) { checkNotNull(tableId, "tableId"); - return new Read(options, tableId, filter, bigtableService); + return new Read(options, tableId, keyRange, filter, bigtableService); } /** @@ -247,6 +266,14 @@ public class BigtableIO { } /** + * Returns the range of keys that will be read from the table. By default, returns + * {@link ByteKeyRange#ALL_KEYS} to scan the entire table. + */ + public ByteKeyRange getKeyRange() { + return keyRange; + } + + /** * Returns the table being read from. */ public String getTableId() { @@ -256,7 +283,7 @@ public class BigtableIO { @Override public PCollection<Row> apply(PBegin input) { BigtableSource source = - new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null); + new BigtableSource(getBigtableService(), tableId, filter, keyRange, null); return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); } @@ -284,6 +311,9 @@ public class BigtableIO { .withLabel("Bigtable Options")); } + builder.addIfNotDefault( + DisplayData.item("keyRange", keyRange.toString()), ByteKeyRange.ALL_KEYS.toString()); + if (filter != null) { builder.add(DisplayData.item("rowFilter", filter.toString()) .withLabel("Table Row Filter")); @@ -295,6 +325,7 @@ public class BigtableIO { return MoreObjects.toStringHelper(Read.class) .add("options", options) .add("tableId", tableId) + .add("keyRange", keyRange) .add("filter", filter) .toString(); } @@ -307,16 +338,19 @@ public class BigtableIO { */ @Nullable private final BigtableOptions options; private final String tableId; + private final ByteKeyRange keyRange; @Nullable private final RowFilter filter; @Nullable private final BigtableService bigtableService; private Read( @Nullable BigtableOptions options, String tableId, + ByteKeyRange keyRange, @Nullable RowFilter filter, @Nullable BigtableService bigtableService) { this.options = options; this.tableId = checkNotNull(tableId, "tableId"); + this.keyRange = checkNotNull(keyRange, "keyRange"); this.filter = filter; this.bigtableService = bigtableService; } @@ -331,7 +365,7 @@ public class BigtableIO { */ Read withBigtableService(BigtableService bigtableService) { checkNotNull(bigtableService, "bigtableService"); - return new Read(options, tableId, filter, bigtableService); + return new Read(options, tableId, keyRange, filter, bigtableService); } /** @@ -615,7 +649,7 @@ public class BigtableIO { String tableId, @Nullable RowFilter filter, ByteKeyRange range, - Long estimatedSizeBytes) { + @Nullable Long estimatedSizeBytes) { this.service = service; this.tableId = tableId; this.filter = filter; @@ -635,7 +669,7 @@ public class BigtableIO { ////// Private state and internal implementation details ////// private final BigtableService service; - @Nullable private final String tableId; + private final String tableId; @Nullable private final RowFilter filter; private final ByteKeyRange range; @Nullable private Long estimatedSizeBytes; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dace48c7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index d60ede6..f21e6c0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -28,7 +28,11 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLabel; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; @@ -49,6 +53,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; @@ -218,12 +223,8 @@ public class BigtableIOTest { final String table = "TEST-EMPTY-TABLE"; service.createTable(table); - TestPipeline p = TestPipeline.create(); - PCollection<Row> rows = p.apply(defaultRead.withTableId(table)); - PAssert.that(rows).empty(); - - p.run(); - logged.verifyInfo(String.format("Closing reader after reading 0 records.")); + runReadTest(defaultRead.withTableId(table), new ArrayList<Row>()); + logged.verifyInfo("Closing reader after reading 0 records."); } /** Tests reading all rows from a table. */ @@ -233,11 +234,7 @@ public class BigtableIOTest { final int numRows = 1001; List<Row> testRows = makeTableData(table, numRows); - TestPipeline p = TestPipeline.create(); - PCollection<Row> rows = p.apply(defaultRead.withTableId(table)); - PAssert.that(rows).containsInAnyOrder(testRows); - - p.run(); + runReadTest(defaultRead.withTableId(table), testRows); logged.verifyInfo(String.format("Closing reader after reading %d records.", numRows)); } @@ -256,6 +253,68 @@ public class BigtableIOTest { } } + private static List<Row> filterToRange(List<Row> rows, final ByteKeyRange range) { + return Lists.newArrayList(Iterables.filter( + rows, + new Predicate<Row>() { + @Override + public boolean apply(@Nullable Row input) { + verifyNotNull(input, "input"); + return range.containsKey(ByteKey.of(input.getKey())); + } + })); + } + + private static void runReadTest(BigtableIO.Read read, List<Row> expected) { + TestPipeline p = TestPipeline.create(); + PCollection<Row> rows = p.apply(read); + PAssert.that(rows).containsInAnyOrder(expected); + p.run(); + } + + /** + * Tests reading all rows using key ranges. Tests a prefix [), a suffix (], and a restricted + * range [] and that some properties hold across them. + */ + @Test + public void testReadingWithKeyRange() throws Exception { + final String table = "TEST-KEY-RANGE-TABLE"; + final int numRows = 1001; + List<Row> testRows = makeTableData(table, numRows); + ByteKey startKey = ByteKey.copyFrom("key000000100".getBytes()); + ByteKey endKey = ByteKey.copyFrom("key000000300".getBytes()); + + // Test prefix: [beginning, startKey). + final ByteKeyRange prefixRange = ByteKeyRange.ALL_KEYS.withEndKey(startKey); + List<Row> prefixRows = filterToRange(testRows, prefixRange); + runReadTest(defaultRead.withTableId(table).withKeyRange(prefixRange), prefixRows); + + // Test suffix: [startKey, end). + final ByteKeyRange suffixRange = ByteKeyRange.ALL_KEYS.withStartKey(startKey); + List<Row> suffixRows = filterToRange(testRows, suffixRange); + runReadTest(defaultRead.withTableId(table).withKeyRange(suffixRange), suffixRows); + + // Test restricted range: [startKey, endKey). + final ByteKeyRange middleRange = ByteKeyRange.of(startKey, endKey); + List<Row> middleRows = filterToRange(testRows, middleRange); + runReadTest(defaultRead.withTableId(table).withKeyRange(middleRange), middleRows); + + //////// Size and content sanity checks ////////// + + // Prefix, suffix, middle should be non-trivial (non-zero,non-all). + assertThat(prefixRows, allOf(hasSize(lessThan(numRows)), hasSize(greaterThan(0)))); + assertThat(suffixRows, allOf(hasSize(lessThan(numRows)), hasSize(greaterThan(0)))); + assertThat(middleRows, allOf(hasSize(lessThan(numRows)), hasSize(greaterThan(0)))); + + // Prefix + suffix should be exactly all rows. + List<Row> union = Lists.newArrayList(prefixRows); + union.addAll(suffixRows); + assertThat("prefix + suffix = total", union, containsInAnyOrder(testRows.toArray(new Row[]{}))); + + // Suffix should contain the middle. + assertThat(suffixRows, hasItems(middleRows.toArray(new Row[]{}))); + } + /** Tests reading all rows using a filter. */ @Test public void testReadingWithFilter() throws Exception { @@ -278,11 +337,8 @@ public class BigtableIOTest { RowFilter filter = RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex)).build(); - TestPipeline p = TestPipeline.create(); - PCollection<Row> rows = p.apply(defaultRead.withTableId(table).withRowFilter(filter)); - PAssert.that(rows).containsInAnyOrder(filteredRows); - - p.run(); + runReadTest( + defaultRead.withTableId(table).withRowFilter(filter), Lists.newArrayList(filteredRows)); } /** @@ -408,10 +464,12 @@ public class BigtableIOTest { .setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*")) .build(); + ByteKeyRange keyRange = ByteKeyRange.ALL_KEYS.withEndKey(ByteKey.of(0xab, 0xcd)); BigtableIO.Read read = BigtableIO.read() .withBigtableOptions(BIGTABLE_OPTIONS) .withTableId("fooTable") - .withRowFilter(rowFilter); + .withRowFilter(rowFilter) + .withKeyRange(keyRange); DisplayData displayData = DisplayData.from(read); @@ -419,8 +477,11 @@ public class BigtableIOTest { hasKey("tableId"), hasLabel("Table ID"), hasValue("fooTable")))); + assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString())); + assertThat(displayData, hasDisplayItem("keyRange", keyRange.toString())); + // BigtableIO adds user-agent to options; assert only on key and not value. assertThat(displayData, hasDisplayItem("bigtableOptions")); }