Repository: incubator-beam
Updated Branches:
  refs/heads/master 984b32ff2 -> 62c56c99b


[BEAM-639] BigtableIO: add support for users to scan table subranges


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dace48c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dace48c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dace48c7

Branch: refs/heads/master
Commit: dace48c70d2d00500514c53734e9dd45dcb1465f
Parents: 984b32f
Author: Dan Halperin <dhalp...@google.com>
Authored: Mon Sep 19 11:54:07 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Mon Sep 19 15:05:17 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 54 ++++++++---
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 95 ++++++++++++++++----
 2 files changed, 122 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dace48c7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 67dde50..c1b882a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -76,8 +76,9 @@ import org.slf4j.LoggerFactory;
  *
  * <p>To configure a Cloud Bigtable source, you must supply a table id and a 
{@link BigtableOptions}
  * or builder configured with the project and other information necessary to 
identify the
- * Bigtable instance. A {@link RowFilter} may also optionally be specified 
using
- * {@link BigtableIO.Read#withRowFilter}. For example:
+ * Bigtable instance. By default, {@link BigtableIO.Read} will read all rows 
in the table. The row
+ * range to be read can optionally be restricted using {@link 
BigtableIO.Read#withKeyRange}, and
+ * a {@link RowFilter} can be specified using {@link 
BigtableIO.Read#withRowFilter}. For example:
  *
  * <pre>{@code
  * BigtableOptions.Builder optionsBuilder =
@@ -93,6 +94,14 @@ import org.slf4j.LoggerFactory;
  *         .withBigtableOptions(optionsBuilder)
  *         .withTableId("table"));
  *
+ * // Scan a prefix of the table.
+ * ByteKeyRange keyRange = ...;
+ * p.apply("read",
+ *     BigtableIO.read()
+ *         .withBigtableOptions(optionsBuilder)
+ *         .withTableId("table")
+ *         .withKeyRange(keyRange));
+ *
  * // Scan a subset of rows that match the specified row filter.
  * p.apply("filtered read",
  *     BigtableIO.read()
@@ -152,7 +161,7 @@ public class BigtableIO {
    */
   @Experimental
   public static Read read() {
-    return new Read(null, "", null, null);
+    return new Read(null, "", ByteKeyRange.ALL_KEYS, null, null);
   }
 
   /**
@@ -215,7 +224,7 @@ public class BigtableIO {
                   .build());
       BigtableOptions optionsWithAgent = 
clonedBuilder.setUserAgent(getUserAgent()).build();
 
-      return new Read(optionsWithAgent, tableId, filter, bigtableService);
+      return new Read(optionsWithAgent, tableId, keyRange, filter, 
bigtableService);
     }
 
     /**
@@ -226,7 +235,17 @@ public class BigtableIO {
      */
     public Read withRowFilter(RowFilter filter) {
       checkNotNull(filter, "filter");
-      return new Read(options, tableId, filter, bigtableService);
+      return new Read(options, tableId, keyRange, filter, bigtableService);
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read only rows in the 
specified range.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withKeyRange(ByteKeyRange keyRange) {
+      checkNotNull(keyRange, "keyRange");
+      return new Read(options, tableId, keyRange, filter, bigtableService);
     }
 
     /**
@@ -236,7 +255,7 @@ public class BigtableIO {
      */
     public Read withTableId(String tableId) {
       checkNotNull(tableId, "tableId");
-      return new Read(options, tableId, filter, bigtableService);
+      return new Read(options, tableId, keyRange, filter, bigtableService);
     }
 
     /**
@@ -247,6 +266,14 @@ public class BigtableIO {
     }
 
     /**
+     * Returns the range of keys that will be read from the table. By default, 
returns
+     * {@link ByteKeyRange#ALL_KEYS} to scan the entire table.
+     */
+    public ByteKeyRange getKeyRange() {
+      return keyRange;
+    }
+
+    /**
      * Returns the table being read from.
      */
     public String getTableId() {
@@ -256,7 +283,7 @@ public class BigtableIO {
     @Override
     public PCollection<Row> apply(PBegin input) {
       BigtableSource source =
-          new BigtableSource(getBigtableService(), tableId, filter, 
ByteKeyRange.ALL_KEYS, null);
+          new BigtableSource(getBigtableService(), tableId, filter, keyRange, 
null);
       return 
input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
     }
 
@@ -284,6 +311,9 @@ public class BigtableIO {
           .withLabel("Bigtable Options"));
       }
 
+      builder.addIfNotDefault(
+          DisplayData.item("keyRange", keyRange.toString()), 
ByteKeyRange.ALL_KEYS.toString());
+
       if (filter != null) {
         builder.add(DisplayData.item("rowFilter", filter.toString())
           .withLabel("Table Row Filter"));
@@ -295,6 +325,7 @@ public class BigtableIO {
       return MoreObjects.toStringHelper(Read.class)
           .add("options", options)
           .add("tableId", tableId)
+          .add("keyRange", keyRange)
           .add("filter", filter)
           .toString();
     }
@@ -307,16 +338,19 @@ public class BigtableIO {
      */
     @Nullable private final BigtableOptions options;
     private final String tableId;
+    private final ByteKeyRange keyRange;
     @Nullable private final RowFilter filter;
     @Nullable private final BigtableService bigtableService;
 
     private Read(
         @Nullable BigtableOptions options,
         String tableId,
+        ByteKeyRange keyRange,
         @Nullable RowFilter filter,
         @Nullable BigtableService bigtableService) {
       this.options = options;
       this.tableId = checkNotNull(tableId, "tableId");
+      this.keyRange = checkNotNull(keyRange, "keyRange");
       this.filter = filter;
       this.bigtableService = bigtableService;
     }
@@ -331,7 +365,7 @@ public class BigtableIO {
      */
     Read withBigtableService(BigtableService bigtableService) {
       checkNotNull(bigtableService, "bigtableService");
-      return new Read(options, tableId, filter, bigtableService);
+      return new Read(options, tableId, keyRange, filter, bigtableService);
     }
 
     /**
@@ -615,7 +649,7 @@ public class BigtableIO {
         String tableId,
         @Nullable RowFilter filter,
         ByteKeyRange range,
-        Long estimatedSizeBytes) {
+        @Nullable Long estimatedSizeBytes) {
       this.service = service;
       this.tableId = tableId;
       this.filter = filter;
@@ -635,7 +669,7 @@ public class BigtableIO {
 
     ////// Private state and internal implementation details //////
     private final BigtableService service;
-    @Nullable private final String tableId;
+    private final String tableId;
     @Nullable private final RowFilter filter;
     private final ByteKeyRange range;
     @Nullable private Long estimatedSizeBytes;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dace48c7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index d60ede6..f21e6c0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -28,7 +28,11 @@ import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLabel;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
 import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -49,6 +53,7 @@ import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
@@ -218,12 +223,8 @@ public class BigtableIOTest {
     final String table = "TEST-EMPTY-TABLE";
     service.createTable(table);
 
-    TestPipeline p = TestPipeline.create();
-    PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
-    PAssert.that(rows).empty();
-
-    p.run();
-    logged.verifyInfo(String.format("Closing reader after reading 0 
records."));
+    runReadTest(defaultRead.withTableId(table), new ArrayList<Row>());
+    logged.verifyInfo("Closing reader after reading 0 records.");
   }
 
   /** Tests reading all rows from a table. */
@@ -233,11 +234,7 @@ public class BigtableIOTest {
     final int numRows = 1001;
     List<Row> testRows = makeTableData(table, numRows);
 
-    TestPipeline p = TestPipeline.create();
-    PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
-    PAssert.that(rows).containsInAnyOrder(testRows);
-
-    p.run();
+    runReadTest(defaultRead.withTableId(table), testRows);
     logged.verifyInfo(String.format("Closing reader after reading %d 
records.", numRows));
   }
 
@@ -256,6 +253,68 @@ public class BigtableIOTest {
     }
   }
 
+  private static List<Row> filterToRange(List<Row> rows, final ByteKeyRange 
range) {
+    return Lists.newArrayList(Iterables.filter(
+        rows,
+        new Predicate<Row>() {
+          @Override
+          public boolean apply(@Nullable Row input) {
+            verifyNotNull(input, "input");
+            return range.containsKey(ByteKey.of(input.getKey()));
+          }
+        }));
+  }
+
+  private static void runReadTest(BigtableIO.Read read, List<Row> expected) {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Row> rows = p.apply(read);
+    PAssert.that(rows).containsInAnyOrder(expected);
+    p.run();
+  }
+
+  /**
+   * Tests reading all rows using key ranges. Tests a prefix [), a suffix (], 
and a restricted
+   * range [] and that some properties hold across them.
+   */
+  @Test
+  public void testReadingWithKeyRange() throws Exception {
+    final String table = "TEST-KEY-RANGE-TABLE";
+    final int numRows = 1001;
+    List<Row> testRows = makeTableData(table, numRows);
+    ByteKey startKey = ByteKey.copyFrom("key000000100".getBytes());
+    ByteKey endKey = ByteKey.copyFrom("key000000300".getBytes());
+
+    // Test prefix: [beginning, startKey).
+    final ByteKeyRange prefixRange = 
ByteKeyRange.ALL_KEYS.withEndKey(startKey);
+    List<Row> prefixRows = filterToRange(testRows, prefixRange);
+    runReadTest(defaultRead.withTableId(table).withKeyRange(prefixRange), 
prefixRows);
+
+    // Test suffix: [startKey, end).
+    final ByteKeyRange suffixRange = 
ByteKeyRange.ALL_KEYS.withStartKey(startKey);
+    List<Row> suffixRows = filterToRange(testRows, suffixRange);
+    runReadTest(defaultRead.withTableId(table).withKeyRange(suffixRange), 
suffixRows);
+
+    // Test restricted range: [startKey, endKey).
+    final ByteKeyRange middleRange = ByteKeyRange.of(startKey, endKey);
+    List<Row> middleRows = filterToRange(testRows, middleRange);
+    runReadTest(defaultRead.withTableId(table).withKeyRange(middleRange), 
middleRows);
+
+    //////// Size and content sanity checks //////////
+
+    // Prefix, suffix, middle should be non-trivial (non-zero,non-all).
+    assertThat(prefixRows, allOf(hasSize(lessThan(numRows)), 
hasSize(greaterThan(0))));
+    assertThat(suffixRows, allOf(hasSize(lessThan(numRows)), 
hasSize(greaterThan(0))));
+    assertThat(middleRows, allOf(hasSize(lessThan(numRows)), 
hasSize(greaterThan(0))));
+
+    // Prefix + suffix should be exactly all rows.
+    List<Row> union = Lists.newArrayList(prefixRows);
+    union.addAll(suffixRows);
+    assertThat("prefix + suffix = total", union, 
containsInAnyOrder(testRows.toArray(new Row[]{})));
+
+    // Suffix should contain the middle.
+    assertThat(suffixRows, hasItems(middleRows.toArray(new Row[]{})));
+  }
+
   /** Tests reading all rows using a filter. */
   @Test
   public void testReadingWithFilter() throws Exception {
@@ -278,11 +337,8 @@ public class BigtableIOTest {
     RowFilter filter =
         
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex)).build();
 
-    TestPipeline p = TestPipeline.create();
-    PCollection<Row> rows = 
p.apply(defaultRead.withTableId(table).withRowFilter(filter));
-    PAssert.that(rows).containsInAnyOrder(filteredRows);
-
-    p.run();
+    runReadTest(
+        defaultRead.withTableId(table).withRowFilter(filter), 
Lists.newArrayList(filteredRows));
   }
 
   /**
@@ -408,10 +464,12 @@ public class BigtableIOTest {
         .setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*"))
         .build();
 
+    ByteKeyRange keyRange = ByteKeyRange.ALL_KEYS.withEndKey(ByteKey.of(0xab, 
0xcd));
     BigtableIO.Read read = BigtableIO.read()
         .withBigtableOptions(BIGTABLE_OPTIONS)
         .withTableId("fooTable")
-        .withRowFilter(rowFilter);
+        .withRowFilter(rowFilter)
+        .withKeyRange(keyRange);
 
     DisplayData displayData = DisplayData.from(read);
 
@@ -419,8 +477,11 @@ public class BigtableIOTest {
         hasKey("tableId"),
         hasLabel("Table ID"),
         hasValue("fooTable"))));
+
     assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
 
+    assertThat(displayData, hasDisplayItem("keyRange", keyRange.toString()));
+
     // BigtableIO adds user-agent to options; assert only on key and not value.
     assertThat(displayData, hasDisplayItem("bigtableOptions"));
   }

Reply via email to