[ 
https://issues.apache.org/jira/browse/BEAM-3154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16329498#comment-16329498
 ] 

ASF GitHub Bot commented on BEAM-3154:
--------------------------------------

jkff closed pull request #4312: [BEAM-3154] Support Multiple KeyRanges when 
reading from BigTable
URL: https://github.com/apache/beam/pull/4312
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 4199b28833c..71c041556a3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -29,6 +29,7 @@
 import com.google.bigtable.v2.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
@@ -36,6 +37,7 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -77,8 +79,8 @@
  *
  * <p>To configure a Cloud Bigtable source, you must supply a table id, a 
project id, an instance
  * id and optionally a {@link BigtableOptions} to provide more specific 
connection configuration.
- * By default, {@link BigtableIO.Read} will read all rows in the table. The 
row range to be read
- * can optionally be restricted using {@link BigtableIO.Read#withKeyRange}, 
and a {@link RowFilter}
+ * By default, {@link BigtableIO.Read} will read all rows in the table. The 
row ranges to be read
+ * can optionally be restricted using {@link BigtableIO.Read#withKeyRanges}, 
and a {@link RowFilter}
  * can be specified using {@link BigtableIO.Read#withRowFilter}. For example:
  *
  * <pre>{@code
@@ -189,7 +191,7 @@ public static Write write() {
 
     /** Returns the range of keys that will be read from the table. */
     @Nullable
-    public abstract ByteKeyRange getKeyRange();
+    public abstract List<ByteKeyRange> getKeyRanges();
 
     /** Returns the table being read from. */
     @Nullable
@@ -217,7 +219,7 @@ static Read create() {
 
       return new AutoValue_BigtableIO_Read.Builder()
         .setBigtableConfig(config)
-        .setKeyRange(ByteKeyRange.ALL_KEYS)
+        .setKeyRanges(Arrays.asList(ByteKeyRange.ALL_KEYS))
         .build();
     }
 
@@ -228,7 +230,7 @@ static Read create() {
 
       abstract Builder setRowFilter(RowFilter filter);
 
-      abstract Builder setKeyRange(ByteKeyRange keyRange);
+      abstract Builder setKeyRanges(List<ByteKeyRange> keyRange);
 
       abstract Read build();
     }
@@ -334,7 +336,22 @@ public Read withRowFilter(RowFilter filter) {
      */
     public Read withKeyRange(ByteKeyRange keyRange) {
       checkArgument(keyRange != null, "keyRange can not be null");
-      return toBuilder().setKeyRange(keyRange).build();
+      return toBuilder().setKeyRanges(Arrays.asList(keyRange)).build();
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read only rows in the 
specified ranges.
+     * Ranges must not overlap.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withKeyRanges(List<ByteKeyRange> keyRanges) {
+      checkArgument(keyRanges != null, "keyRanges can not be null");
+      checkArgument(!keyRanges.isEmpty(), "keyRanges can not be empty");
+      for (ByteKeyRange range : keyRanges) {
+        checkArgument(range != null, "keyRanges cannot hold null range");
+      }
+      return toBuilder().setKeyRanges(keyRanges).build();
     }
 
     /**
@@ -376,7 +393,7 @@ Read withBigtableService(BigtableService bigtableService) {
             public BigtableService apply(PipelineOptions options) {
               return getBigtableConfig().getBigtableService(options);
             }
-          }, getTableId(), getRowFilter(), getKeyRange(), null);
+          }, getTableId(), getRowFilter(), getKeyRanges(), null);
       return 
input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
     }
 
@@ -390,8 +407,11 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       super.populateDisplayData(builder);
       getBigtableConfig().populateDisplayData(builder);
 
-      builder.addIfNotDefault(
-          DisplayData.item("keyRange", getKeyRange().toString()), 
ByteKeyRange.ALL_KEYS.toString());
+      List<ByteKeyRange> keyRanges = getKeyRanges();
+      for (int i = 0; i < keyRanges.size() && i < 5; i++) {
+        builder.addIfNotDefault(DisplayData.item("keyRange " + i, 
keyRanges.get(i).toString()),
+            ByteKeyRange.ALL_KEYS.toString());
+      }
 
       if (getRowFilter() != null) {
         builder.add(DisplayData.item("rowFilter", getRowFilter().toString())
@@ -401,11 +421,12 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
     @Override
     public String toString() {
-      return MoreObjects.toStringHelper(Read.class)
-        .add("config", getBigtableConfig())
-        .add("keyRange", getKeyRange())
-        .add("filter", getRowFilter())
-        .toString();
+      ToStringHelper helper = MoreObjects.toStringHelper(Read.class)
+          .add("config", getBigtableConfig());
+      for (int i = 0; i < getKeyRanges().size(); i++) {
+        helper.add("keyRange " + i, getKeyRanges().get(i));
+      }
+      return helper.add("filter", getRowFilter()).toString();
     }
   }
 
@@ -738,12 +759,12 @@ public BigtableSource(
         SerializableFunction<PipelineOptions, BigtableService> serviceFactory,
         String tableId,
         @Nullable RowFilter filter,
-        ByteKeyRange range,
+        List<ByteKeyRange> ranges,
         @Nullable Long estimatedSizeBytes) {
       this.serviceFactory = serviceFactory;
       this.tableId = tableId;
       this.filter = filter;
-      this.range = range;
+      this.ranges = ranges;
       this.estimatedSizeBytes = estimatedSizeBytes;
     }
 
@@ -752,7 +773,7 @@ public String toString() {
       return MoreObjects.toStringHelper(BigtableSource.class)
           .add("tableId", tableId)
           .add("filter", filter)
-          .add("range", range)
+          .add("ranges", ranges)
           .add("estimatedSizeBytes", estimatedSizeBytes)
           .toString();
     }
@@ -761,25 +782,22 @@ public String toString() {
     private final SerializableFunction<PipelineOptions, BigtableService> 
serviceFactory;
     private final String tableId;
     @Nullable private final RowFilter filter;
-    private final ByteKeyRange range;
+    private final List<ByteKeyRange> ranges;
     @Nullable private Long estimatedSizeBytes;
     @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys;
 
-    protected BigtableSource withStartKey(ByteKey startKey) {
-      checkArgument(startKey != null, "startKey can not be null");
-      return new BigtableSource(
-          serviceFactory, tableId, filter, range.withStartKey(startKey), 
estimatedSizeBytes);
-    }
-
-    protected BigtableSource withEndKey(ByteKey endKey) {
-      checkArgument(endKey != null, "endKey can not be null");
-      return new BigtableSource(
-          serviceFactory, tableId, filter, range.withEndKey(endKey), 
estimatedSizeBytes);
+    /**
+     * Creates a new {@link BigtableSource} with just one {@link ByteKeyRange}.
+     */
+    protected BigtableSource withSingleRange(ByteKeyRange range) {
+      checkArgument(range != null, "range can not be null");
+      return new BigtableSource(serviceFactory, tableId, filter,
+          Arrays.asList(range), estimatedSizeBytes);
     }
 
     protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
       checkArgument(estimatedSizeBytes != null, "estimatedSizeBytes can not be 
null");
-      return new BigtableSource(serviceFactory, tableId, filter, range, 
estimatedSizeBytes);
+      return new BigtableSource(serviceFactory, tableId, filter, ranges, 
estimatedSizeBytes);
     }
 
     /**
@@ -814,13 +832,25 @@ protected BigtableSource withEstimatedSizeBytes(Long 
estimatedSizeBytes) {
         LOG.info("Not splitting source {} because no sample row keys are 
available.", this);
         return Collections.singletonList(this);
       }
-
       LOG.info(
           "About to split into bundles of size {} with sampleRowKeys length {} 
first element {}",
           desiredBundleSizeBytes,
           sampleRowKeys.size(),
           sampleRowKeys.get(0));
 
+      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
+      for (ByteKeyRange range : ranges) {
+        splits.addAll(splitRangeBasedOnSamples(desiredBundleSizeBytes, 
sampleRowKeys, range));
+      }
+      return splits.build();
+    }
+
+    /** Helper that splits a {@code ByteKeyRange} into bundles based on
+     *  Cloud Bigtable sampled row keys.
+     */
+    private List<BigtableSource> splitRangeBasedOnSamples(long 
desiredBundleSizeBytes,
+        List<SampleRowKeysResponse> sampleRowKeys, ByteKeyRange range) {
+
       // Loop through all sampled responses and generate splits from the ones 
that overlap the
       // scan range. The main complication is that we must track the end range 
of the previous
       // sample to generate good ranges.
@@ -877,7 +907,7 @@ protected BigtableSource withEstimatedSizeBytes(Long 
estimatedSizeBytes) {
       //  2. we want to scan to the end (endKey is empty) or farther 
(lastEndKey < endKey).
       if (!lastEndKey.isEmpty()
           && (range.getEndKey().isEmpty() || 
lastEndKey.compareTo(range.getEndKey()) < 0)) {
-        
splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey()));
+        splits.add(this.withSingleRange(ByteKeyRange.of(lastEndKey, 
range.getEndKey())));
       }
 
       List<BigtableSource> ret = splits.build();
@@ -896,7 +926,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) 
throws IOException {
 
     /**
      * Computes the estimated size in bytes based on the total size of all 
samples that overlap
-     * the key range this source will scan.
+     * the key ranges this source will scan.
      */
     private long 
getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {
       long estimatedSizeBytes = 0;
@@ -912,8 +942,15 @@ private long 
getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> sam
           // Skip an empty region.
           lastOffset = currentOffset;
           continue;
-        } else if (range.overlaps(ByteKeyRange.of(currentStartKey, 
currentEndKey))) {
-          estimatedSizeBytes += currentOffset - lastOffset;
+        } else {
+          for (ByteKeyRange range : ranges) {
+            if (range.overlaps(ByteKeyRange.of(currentStartKey, 
currentEndKey))) {
+              estimatedSizeBytes += currentOffset - lastOffset;
+              // We don't want to double our estimated size if two ranges 
overlap this sample
+              // region, so exit early.
+              break;
+            }
+          }
         }
         currentStartKey = currentEndKey;
         lastOffset = currentOffset;
@@ -959,7 +996,7 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
           desiredBundleSizeBytes);
       if (sampleSizeBytes <= desiredBundleSizeBytes) {
         return Collections.singletonList(
-            
this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey()));
+            this.withSingleRange(ByteKeyRange.of(range.getStartKey(), 
range.getEndKey())));
       }
 
       checkArgument(
@@ -978,16 +1015,15 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
         ByteKey next = keys.next();
         splits.add(
             this
-                .withStartKey(prev)
-                .withEndKey(next)
+                .withSingleRange(ByteKeyRange.of(prev, next))
                 .withEstimatedSizeBytes(sampleSizeBytes / splitCount));
         prev = next;
       }
       return splits.build();
     }
 
-    public ByteKeyRange getRange() {
-      return range;
+    public List<ByteKeyRange> getRanges() {
+      return ranges;
     }
 
     public RowFilter getRowFilter() {
@@ -1009,9 +1045,10 @@ public String getTableId() {
     private long recordsReturned;
 
     public BigtableReader(BigtableSource source, BigtableService service) {
+      checkArgument(source.getRanges().size() == 1, "source must have exactly 
one key range");
       this.source = source;
       this.service = service;
-      rangeTracker = ByteKeyRangeTracker.of(source.getRange());
+      rangeTracker = ByteKeyRangeTracker.of(source.getRanges().get(0));
     }
 
     @Override
@@ -1072,11 +1109,12 @@ public final long getSplitPointsConsumed() {
     @Nullable
     public final synchronized BigtableSource splitAtFraction(double fraction) {
       ByteKey splitKey;
+      ByteKeyRange range = rangeTracker.getRange();
       try {
-        splitKey = rangeTracker.getRange().interpolateKey(fraction);
+        splitKey = range.interpolateKey(fraction);
       } catch (RuntimeException e) {
         LOG.info(
-            "{}: Failed to interpolate key for fraction {}.", 
rangeTracker.getRange(), fraction, e);
+            "{}: Failed to interpolate key for fraction {}.", range, fraction, 
e);
         return null;
       }
       LOG.info(
@@ -1084,8 +1122,8 @@ public final synchronized BigtableSource 
splitAtFraction(double fraction) {
       BigtableSource primary;
       BigtableSource residual;
       try {
-         primary = source.withEndKey(splitKey);
-         residual =  source.withStartKey(splitKey);
+         primary = source.withSingleRange(ByteKeyRange.of(range.getStartKey(), 
splitKey));
+         residual = source.withSingleRange(ByteKeyRange.of(splitKey, 
range.getEndKey()));
       } catch (RuntimeException e) {
         LOG.info(
             "{}: Interpolating for fraction {} yielded invalid split key {}.",
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 78f721f1b78..06c459bb447 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -43,6 +43,7 @@
 import java.util.List;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.values.KV;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,14 +107,15 @@ public BigtableReaderImpl(BigtableSession session, 
BigtableSource source) {
 
     @Override
     public boolean start() throws IOException {
-      RowRange range =
-          RowRange.newBuilder()
-              
.setStartKeyClosed(ByteString.copyFrom(source.getRange().getStartKey().getValue()))
-              
.setEndKeyOpen(ByteString.copyFrom(source.getRange().getEndKey().getValue()))
-              .build();
-      RowSet rowSet = RowSet.newBuilder()
-          .addRowRanges(range)
-          .build();
+      RowSet.Builder rowSetBuilder = RowSet.newBuilder();
+      for (ByteKeyRange sourceRange : source.getRanges()) {
+        rowSetBuilder = rowSetBuilder.addRowRanges(
+            RowRange.newBuilder()
+                
.setStartKeyClosed(ByteString.copyFrom(sourceRange.getStartKey().getValue()))
+                
.setEndKeyOpen(ByteString.copyFrom(sourceRange.getEndKey().getValue())));
+      }
+      RowSet rowSet = rowSetBuilder.build();
+
       ReadRowsRequest.Builder requestB =
           ReadRowsRequest.newBuilder()
               .setRows(rowSet)
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index da4b65ce5b7..4b878a466b5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -63,6 +63,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -393,19 +394,28 @@ public boolean apply(@Nullable ByteString input) {
   }
 
   private static List<Row> filterToRange(List<Row> rows, final ByteKeyRange 
range) {
+    return filterToRanges(rows, ImmutableList.of(range));
+  }
+
+  private static List<Row> filterToRanges(List<Row> rows, final 
List<ByteKeyRange> ranges) {
     return Lists.newArrayList(Iterables.filter(
         rows,
         new Predicate<Row>() {
           @Override
           public boolean apply(@Nullable Row input) {
             verifyNotNull(input, "input");
-            return range.containsKey(makeByteKey(input.getKey()));
+            for (ByteKeyRange range : ranges) {
+              if (range.containsKey(makeByteKey(input.getKey()))) {
+                return true;
+              }
+            }
+            return false;
           }
         }));
   }
 
   private void runReadTest(BigtableIO.Read read, List<Row> expected) {
-    PCollection<Row> rows = p.apply(read.getTableId() + "_" + 
read.getKeyRange(), read);
+    PCollection<Row> rows = p.apply(read.getTableId() + "_" + 
read.getKeyRanges(), read);
     PAssert.that(rows).containsInAnyOrder(expected);
     p.run();
   }
@@ -454,6 +464,34 @@ public void testReadingWithKeyRange() throws Exception {
     assertThat(suffixRows, hasItems(middleRows.toArray(new Row[]{})));
   }
 
+  /**
+   * Tests reading three key ranges with one read.
+   */
+  @Test
+  public void testReadingWithKeyRanges() throws Exception {
+    final String table = "TEST-KEY-RANGE-TABLE";
+    final int numRows = 11;
+    List<Row> testRows = makeTableData(table, numRows);
+    ByteKey startKey1 = ByteKey.copyFrom("key000000001".getBytes());
+    ByteKey endKey1 = ByteKey.copyFrom("key000000003".getBytes());
+    ByteKey startKey2 = ByteKey.copyFrom("key000000004".getBytes());
+    ByteKey endKey2 = ByteKey.copyFrom("key000000007".getBytes());
+    ByteKey startKey3 = ByteKey.copyFrom("key000000008".getBytes());
+    ByteKey endKey3 = ByteKey.copyFrom("key000000009".getBytes());
+
+    service.setupSampleRowKeys(table, numRows / 10, "key000000001".length());
+
+    final ByteKeyRange range1 = ByteKeyRange.of(startKey1, endKey1);
+    final ByteKeyRange range2 = ByteKeyRange.of(startKey2, endKey2);
+    final ByteKeyRange range3 = ByteKeyRange.of(startKey3, endKey3);
+    List<ByteKeyRange> ranges = ImmutableList.of(range1, range2, range3);
+    List<Row> rangeRows = filterToRanges(testRows, ranges);
+    runReadTest(defaultRead.withTableId(table).withKeyRanges(ranges), 
rangeRows);
+
+    // range rows should be non-trivial (non-zero,non-all).
+    assertThat(rangeRows, allOf(hasSize(lessThan(numRows)), 
hasSize(greaterThan(0))));
+  }
+
   /** Tests reading all rows using a filter. */
   @Test
   public void testReadingWithFilter() throws Exception {
@@ -494,7 +532,12 @@ public void testReadingSplitAtFractionExhaustive() throws 
Exception {
     service.setupSampleRowKeys(table, numSamples, bytesPerRow);
 
     BigtableSource source =
-        new BigtableSource(serviceFactory, table, null, 
service.getTableRange(table), null);
+        new BigtableSource(
+            serviceFactory,
+            table,
+            null,
+            Arrays.asList(service.getTableRange(table)),
+            null);
     assertSplitAtFractionExhaustive(source, null);
   }
 
@@ -511,7 +554,11 @@ public void testReadingSplitAtFraction() throws Exception {
     service.setupSampleRowKeys(table, numSamples, bytesPerRow);
 
     BigtableSource source =
-        new BigtableSource(serviceFactory, table, null, 
service.getTableRange(table), null);
+        new BigtableSource(serviceFactory,
+            table,
+            null,
+            Arrays.asList(service.getTableRange(table)),
+            null);
     // With 0 items read, all split requests will fail.
     assertSplitAtFractionFails(source, 0, 0.1, null /* options */);
     assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
@@ -544,7 +591,7 @@ public void testReadingWithSplits() throws Exception {
         new BigtableSource(serviceFactory,
             table,
             null /*filter*/,
-            ByteKeyRange.ALL_KEYS,
+            Arrays.asList(ByteKeyRange.ALL_KEYS),
             null /*size*/);
     List<BigtableSource> splits =
         source.split(numRows * bytesPerRow / numSamples, null /* options */);
@@ -554,6 +601,50 @@ public void testReadingWithSplits() throws Exception {
     assertSourcesEqualReferenceSource(source, splits, null /* options */);
   }
 
+  /** Tests reading all rows from a split table with several key ranges. */
+  @Test
+  public void testReadingWithSplitsWithSeveralKeyRanges() throws Exception {
+    final String table = "TEST-MANY-ROWS-SPLITS-TABLE-MULTIPLE-RANGES";
+    final int numRows = 1500;
+    final int numSamples = 10;
+    // Two more splits are generated because of the split keys at 500 and 1000.
+    // E.g. the split [450, 600) becomes [450, 500) and [500, 600).
+    final int numSplits = 12;
+    final long bytesPerRow = 100L;
+
+    // Set up test table data and sample row keys for size estimation and 
splitting.
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    ByteKey splitKey1 = ByteKey.copyFrom("key000000500".getBytes());
+    ByteKey splitKey2 = ByteKey.copyFrom("key000001000".getBytes());
+
+    ByteKeyRange tableRange = service.getTableRange(table);
+    List<ByteKeyRange> keyRanges = Arrays.asList(
+        tableRange.withEndKey(splitKey1),
+        tableRange.withStartKey(splitKey1).withEndKey(splitKey2),
+        tableRange.withStartKey(splitKey2));
+    // Generate source and split it.
+    BigtableSource source =
+        new BigtableSource(serviceFactory,
+            table,
+            null /*filter*/,
+            keyRanges,
+            null /*size*/);
+    BigtableSource referenceSource =
+        new BigtableSource(serviceFactory,
+            table,
+            null /*filter*/,
+            ImmutableList.of(service.getTableRange(table)),
+            null /*size*/);
+    List<BigtableSource> splits = // 10,000
+        source.split(numRows * bytesPerRow / numSamples, null /* options */);
+
+    // Test num splits and split equality.
+    assertThat(splits, hasSize(numSplits));
+    assertSourcesEqualReferenceSource(referenceSource, splits, null /* options 
*/);
+  }
+
   /** Tests reading all rows from a sub-split table. */
   @Test
   public void testReadingWithSubSplits() throws Exception {
@@ -572,7 +663,7 @@ public void testReadingWithSubSplits() throws Exception {
         new BigtableSource(serviceFactory,
         table,
         null /*filter*/,
-        ByteKeyRange.ALL_KEYS,
+        Arrays.asList(ByteKeyRange.ALL_KEYS),
         null /*size*/);
     List<BigtableSource> splits = source.split(numRows * bytesPerRow / 
numSplits, null);
 
@@ -581,6 +672,54 @@ public void testReadingWithSubSplits() throws Exception {
     assertSourcesEqualReferenceSource(source, splits, null /* options */);
   }
 
+  /** Tests reading all rows from a sub-split table with several key ranges. */
+  @Test
+  public void testReadingWithSubSplitsWithSeveralKeyRanges() throws Exception {
+    final String table = "TEST-MANY-ROWS-SPLITS-TABLE-MULTIPLE-RANGES";
+    final int numRows = 1000;
+    final int numSamples = 10;
+    final int numSplits = 20;
+    // We expect 24 splits instead of 20 due to the multiple ranges. For a key 
of 330 separating
+    // the multiple ranges, first the [300, 330) range is subsplit into two 
(since numSplits is
+    // twice numSamples), so we get [300, 315) and [315, 330). Then, the [330, 
400) range is also
+    // split into two, resulting in [330, 365) and [365, 400). These ranges 
would instead be
+    // [300, 350) and [350, 400) if this source was one range. Thus, each 
extra range adds two
+    // resulting splits.
+    final int expectedNumSplits = 24;
+    final long bytesPerRow = 100L;
+
+    // Set up test table data and sample row keys for size estimation and 
splitting.
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    ByteKey splitKey1 = ByteKey.copyFrom("key000000330".getBytes());
+    ByteKey splitKey2 = ByteKey.copyFrom("key000000730".getBytes());
+
+    ByteKeyRange tableRange = service.getTableRange(table);
+    List<ByteKeyRange> keyRanges = Arrays.asList(
+        tableRange.withEndKey(splitKey1),
+        tableRange.withStartKey(splitKey1).withEndKey(splitKey2),
+        tableRange.withStartKey(splitKey2));
+    // Generate source and split it.
+    BigtableSource source =
+        new BigtableSource(serviceFactory,
+            table,
+            null /*filter*/,
+            keyRanges,
+            null /*size*/);
+    BigtableSource referenceSource =
+        new BigtableSource(serviceFactory,
+            table,
+            null /*filter*/,
+            ImmutableList.of(service.getTableRange(table)),
+            null /*size*/);
+    List<BigtableSource> splits = source.split(numRows * bytesPerRow / 
numSplits, null);
+
+    // Test num splits and split equality.
+    assertThat(splits, hasSize(expectedNumSplits));
+    assertSourcesEqualReferenceSource(referenceSource, splits, null /* options 
*/);
+  }
+
   /** Tests reading all rows from a sub-split table. */
   @Test
   public void testReadingWithFilterAndSubSplits() throws Exception {
@@ -598,7 +737,12 @@ public void testReadingWithFilterAndSubSplits() throws 
Exception {
     RowFilter filter =
         
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
     BigtableSource source =
-        new BigtableSource(serviceFactory, table, filter, 
ByteKeyRange.ALL_KEYS, null /*size*/);
+        new BigtableSource(
+            serviceFactory,
+            table,
+            filter,
+            Arrays.asList(ByteKeyRange.ALL_KEYS),
+            null /*size*/);
     List<BigtableSource> splits = source.split(numRows * bytesPerRow / 
numSplits, null);
 
     // Test num splits and split equality.
@@ -628,7 +772,7 @@ public void testReadingDisplayData() {
 
     assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
 
-    assertThat(displayData, hasDisplayItem("keyRange", keyRange.toString()));
+    assertThat(displayData, hasDisplayItem("keyRange 0", keyRange.toString()));
 
     // BigtableIO adds user-agent to options; assert only on key and not value.
     assertThat(displayData, hasDisplayItem("bigtableOptions"));
@@ -759,7 +903,7 @@ public void testGetSplitPointsConsumed() throws Exception {
     makeTableData(table, numRows);
 
     BigtableSource source =
-        new BigtableSource(serviceFactory, table, null, ByteKeyRange.ALL_KEYS, 
null);
+        new BigtableSource(serviceFactory, table, null, 
Arrays.asList(ByteKeyRange.ALL_KEYS), null);
 
     BoundedReader<Row> reader = 
source.createReader(TestPipeline.testingPipelineOptions());
 
@@ -986,7 +1130,7 @@ public boolean advance() {
       while (rows.hasNext()) {
         entry = rows.next();
         if (!filter.apply(entry.getKey())
-            || !source.getRange().containsKey(makeByteKey(entry.getKey()))) {
+            || !rangesContainsKey(source.getRanges(), 
makeByteKey(entry.getKey()))) {
           // Does not match row filter or does not match source range. Skip.
           entry = null;
           continue;
@@ -1006,6 +1150,15 @@ public boolean advance() {
       return true;
     }
 
+    private boolean rangesContainsKey(List<ByteKeyRange> ranges, ByteKey key){
+      for (ByteKeyRange range : ranges) {
+        if (range.containsKey(key)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
     @Override
     public Row getCurrentRow() {
       if (currentRow == null) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support multiple KeyRanges when reading from BigTable
> -----------------------------------------------------
>
>                 Key: BEAM-3154
>                 URL: https://issues.apache.org/jira/browse/BEAM-3154
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-gcp
>            Reporter: Ryan Niemocienski
>            Assignee: Solomon Duskis
>            Priority: Minor
>
> BigTableIO.Read currently only supports reading one KeyRange from BT. It 
> would be nice to read multiple ranges from BigTable in one read. Thoughts on 
> the feasibility of this before I dig into it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to