[ 
https://issues.apache.org/jira/browse/BEAM-3246?focusedWorklogId=101242&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101242
 ]

ASF GitHub Bot logged work on BEAM-3246:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/18 18:08
            Start Date: 11/May/18 18:08
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #4517: [BEAM-3246] 
Bigtable: Merge splits if they exceed 15K
URL: https://github.com/apache/beam/pull/4517
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 4e602699e58..2e67648bb58 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -34,6 +34,7 @@
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -836,6 +837,8 @@ protected BigtableSource withEstimatedSizeBytes(Long 
estimatedSizeBytes) {
       return config.getBigtableService(pipelineOptions).getSampleRowKeys(this);
     }
 
+    private static final long MAX_SPLIT_COUNT = 15_360L;
+
     @Override
     public List<BigtableSource> split(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception 
{
@@ -847,7 +850,103 @@ protected BigtableSource withEstimatedSizeBytes(Long 
estimatedSizeBytes) {
           Math.max(sizeEstimate / maximumNumberOfSplits, 
desiredBundleSizeBytes);
 
       // Delegate to testable helper.
-      return splitBasedOnSamples(desiredBundleSizeBytes, 
getSampleRowKeys(options));
+      List<BigtableSource> splits =
+          splitBasedOnSamples(desiredBundleSizeBytes, 
getSampleRowKeys(options));
+      return reduceSplits(splits, options, MAX_SPLIT_COUNT);
+    }
+
+    @VisibleForTesting
+    protected List<BigtableSource> reduceSplits(
+        List<BigtableSource> splits, PipelineOptions options, long 
maxSplitCounts)
+        throws IOException {
+      int numberToCombine =
+          (int) ((splits.size() + maxSplitCounts - 1) / maxSplitCounts);
+      if (splits.size() < maxSplitCounts || numberToCombine < 2) {
+        return splits;
+      }
+      ImmutableList.Builder<BigtableSource> reducedSplits = 
ImmutableList.builder();
+      List<ByteKeyRange> previousSourceRanges = new ArrayList<ByteKeyRange>();
+      int counter = 0;
+      long size = 0;
+      for (BigtableSource source : splits) {
+        if (counter == numberToCombine
+            || !checkRangeAdjacency(previousSourceRanges, source.getRanges())) 
{
+          reducedSplits.add(
+              new BigtableSource(
+                  config,
+                  filter,
+                  previousSourceRanges,
+                  size));
+          counter = 0;
+          size = 0;
+          previousSourceRanges = new ArrayList<ByteKeyRange>();
+        }
+        previousSourceRanges.addAll(source.getRanges());
+        previousSourceRanges = mergeRanges(previousSourceRanges);
+        size += source.getEstimatedSizeBytes(options);
+        counter++;
+      }
+      if (size > 0) {
+        reducedSplits.add(
+            new BigtableSource(
+                config,
+                filter,
+                previousSourceRanges,
+                size));
+      }
+      return reducedSplits.build();
+    }
+
+    /** Helper to validate range Adjacency.
+     * Ranges are considered adjacent if [1..100][100..200][200..300]
+     **/
+    private static boolean checkRangeAdjacency(List<ByteKeyRange> ranges,
+        List<ByteKeyRange> otherRanges) {
+      checkArgument(ranges != null || otherRanges != null, "Both ranges cannot 
be null.");
+      ImmutableList.Builder<ByteKeyRange> mergedRanges = 
ImmutableList.builder();
+      if (ranges != null) {
+        mergedRanges.addAll(ranges);
+      }
+      if (otherRanges != null) {
+        mergedRanges.addAll(otherRanges);
+      }
+      return checkRangeAdjacency(mergedRanges.build());
+    }
+
+    /** Helper to validate range Adjacency.
+     * Ranges are considered adjacent if [1..100][100..200][200..300]
+     **/
+    private static boolean checkRangeAdjacency(List<ByteKeyRange> ranges) {
+      int index = 0;
+      if (ranges.size() < 2) {
+        return true;
+      }
+      ByteKey lastEndKey = ranges.get(index++).getEndKey();
+      while (index < ranges.size()) {
+        ByteKeyRange currentKeyRange = ranges.get(index++);
+        if (!lastEndKey.equals(currentKeyRange.getStartKey())) {
+          return false;
+        }
+        lastEndKey = currentKeyRange.getEndKey();
+      }
+      return true;
+    }
+
+    /** Helper to combine/merge ByteKeyRange
+     * Ranges should only be merged if they are adjacent
+     * ex. [1..100][100..200][200..300] will result in [1..300]
+     * Note: this method will not check for adjacency see {@link 
#checkRangeAdjacency(List)}
+     **/
+    private static List<ByteKeyRange> mergeRanges(List<ByteKeyRange> ranges) {
+      List<ByteKeyRange> response = new ArrayList<ByteKeyRange>();
+      if (ranges.size() < 2) {
+        response.add(ranges.get(0));
+      } else {
+        response.add(ByteKeyRange.of(
+            ranges.get(0).getStartKey(),
+            ranges.get(ranges.size() - 1).getEndKey()));
+      }
+      return response;
     }
 
     /** Helper that splits this source into bundles based on Cloud Bigtable 
sampled row keys. */
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 223588e9e8e..89cfeed04df 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -99,6 +99,7 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
+import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -595,6 +596,180 @@ public void testReadingWithSplits() throws Exception {
     assertSourcesEqualReferenceSource(source, splits, null /* options */);
   }
 
+  private void assertAllSourcesHaveSingleAdjacentRanges(List<BigtableSource> 
sources) {
+    if (sources.size() > 0) {
+      assertThat(sources.get(0).getRanges(), hasSize(1));
+      for (int i = 1; i < sources.size(); i++) {
+        assertThat(sources.get(i).getRanges(), hasSize(1));
+        ByteKey lastEndKey = sources.get(i - 1).getRanges().get(0).getEndKey();
+        ByteKey currentStartKey = 
sources.get(i).getRanges().get(0).getStartKey();
+        assertEquals(lastEndKey, currentStartKey);
+      }
+    }
+  }
+
+  private void assertAllSourcesHaveSingleRanges(List<BigtableSource> sources) {
+    for (BigtableSource source : sources) {
+      assertThat(source.getRanges(), hasSize(1));
+    }
+  }
+
+  private ByteKey createByteKey(int key) {
+    return ByteKey.copyFrom(String.format("key%09d", key).getBytes());
+  }
+
+  /** Tests reduce splits with few non adjacent ranges. */
+  @Test
+  public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception {
+    final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
+    final int numRows = 10;
+    final int numSamples = 10;
+    final long bytesPerRow = 100L;
+    final int maxSplit = 3;
+
+    // Set up test table data and sample row keys for size estimation and 
splitting.
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    //Construct few non contiguous key ranges 
[..1][1..2][3..4][4..5][6..7][8..9]
+    List<ByteKeyRange> keyRanges = Arrays.asList(
+        ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)),
+        ByteKeyRange.of(createByteKey(1), createByteKey(2)),
+        ByteKeyRange.of(createByteKey(3), createByteKey(4)),
+        ByteKeyRange.of(createByteKey(4), createByteKey(5)),
+        ByteKeyRange.of(createByteKey(6), createByteKey(7)),
+        ByteKeyRange.of(createByteKey(8), createByteKey(9)));
+
+    //Expected ranges after split and reduction by maxSplitCount is 
[..2][3..5][6..7][8..9]
+    List<ByteKeyRange> expectedKeyRangesAfterReducedSplits = Arrays.asList(
+        ByteKeyRange.of(ByteKey.EMPTY, createByteKey(2)),
+        ByteKeyRange.of(createByteKey(3), createByteKey(5)),
+        ByteKeyRange.of(createByteKey(6), createByteKey(7)),
+        ByteKeyRange.of(createByteKey(8), createByteKey(9)));
+
+    // Generate source and split it.
+    BigtableSource source =
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
+            null /*filter*/,
+            keyRanges,
+            null /*size*/);
+
+    List<BigtableSource> splits =
+        source.split(numRows * bytesPerRow / numSamples, null /* options */);
+
+    assertThat(splits, hasSize(keyRanges.size()));
+
+    List<BigtableSource> reducedSplits =
+        source.reduceSplits(splits, null, maxSplit);
+
+    List<ByteKeyRange> actualRangesAfterSplit = new ArrayList<ByteKeyRange>();
+
+    for (BigtableSource splitSource : reducedSplits) {
+      actualRangesAfterSplit.addAll(splitSource.getRanges());
+    }
+
+    assertAllSourcesHaveSingleRanges(reducedSplits);
+
+    assertThat(actualRangesAfterSplit,
+        
IsIterableContainingInOrder.contains(expectedKeyRangesAfterReducedSplits.toArray()));
+  }
+
+  /** Tests reduce split with all non adjacent ranges. */
+  @Test
+  public void testReduceSplitsWithAllNonAdjacentRange() throws Exception {
+    final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
+    final int numRows = 10;
+    final int numSamples = 10;
+    final long bytesPerRow = 100L;
+    final int maxSplit = 3;
+
+    // Set up test table data and sample row keys for size estimation and 
splitting.
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    //Construct non contiguous key ranges [..1][2..3][4..5][6..7][8..9]
+    List<ByteKeyRange> keyRanges = Arrays.asList(
+        ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)),
+        ByteKeyRange.of(createByteKey(2), createByteKey(3)),
+        ByteKeyRange.of(createByteKey(4), createByteKey(5)),
+        ByteKeyRange.of(createByteKey(6), createByteKey(7)),
+        ByteKeyRange.of(createByteKey(8), createByteKey(9)));
+
+    // Generate source and split it.
+    BigtableSource source =
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
+            null /*filter*/,
+            keyRanges,
+            null /*size*/);
+
+    List<BigtableSource> splits =
+        source.split(numRows * bytesPerRow / numSamples, null /* options */);
+
+    assertThat(splits, hasSize(keyRanges.size()));
+
+    List<BigtableSource> reducedSplits =
+        source.reduceSplits(splits, null, maxSplit);
+
+    List<ByteKeyRange> actualRangesAfterSplit = new ArrayList<ByteKeyRange>();
+
+    for (BigtableSource splitSource : reducedSplits) {
+      actualRangesAfterSplit.addAll(splitSource.getRanges());
+    }
+
+    assertAllSourcesHaveSingleRanges(reducedSplits);
+
+    //The expected split source ranges are exactly same as original
+    assertThat(actualRangesAfterSplit,
+        IsIterableContainingInOrder.contains(keyRanges.toArray()));
+  }
+
+  /** Tests reduce Splits with all adjacent ranges. */
+  @Test
+  public void tesReduceSplitsWithAdjacentRanges() throws Exception {
+    final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
+    final int numRows = 10;
+    final int numSamples = 10;
+    final long bytesPerRow = 100L;
+    final int maxSplit = 3;
+
+    // Set up test table data and sample row keys for size estimation and 
splitting.
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    // Generate source and split it.
+    BigtableSource source =
+        new 
BigtableSource(config.withTableId(ValueProvider.StaticValueProvider.of(table)),
+            null /*filter*/,
+            Arrays.asList(ByteKeyRange.ALL_KEYS),
+            null /*size*/);
+
+    List<BigtableSource> splits =
+        source.split(numRows * bytesPerRow / numSamples, null /* options */);
+
+    assertThat(splits, hasSize(numSamples));
+
+    //Splits Source have ranges 
[..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..]
+    //expected reduced Split source ranges are [..4][4..8][8..]
+    List<ByteKeyRange> expectedKeyRangesAfterReducedSplits = Arrays.asList(
+        ByteKeyRange.of(ByteKey.EMPTY, createByteKey(4)),
+        ByteKeyRange.of(createByteKey(4), createByteKey(8)),
+        ByteKeyRange.of(createByteKey(8), ByteKey.EMPTY));
+
+    List<BigtableSource> reducedSplits =
+        source.reduceSplits(splits, null, maxSplit);
+
+    List<ByteKeyRange> actualRangesAfterSplit = new ArrayList<ByteKeyRange>();
+
+    for (BigtableSource splitSource : reducedSplits) {
+      actualRangesAfterSplit.addAll(splitSource.getRanges());
+    }
+
+    assertThat(actualRangesAfterSplit,
+        
IsIterableContainingInOrder.contains(expectedKeyRangesAfterReducedSplits.toArray()));
+    assertAllSourcesHaveSingleAdjacentRanges(reducedSplits);
+    assertSourcesEqualReferenceSource(source, reducedSplits, null /* options 
*/);
+  }
+
   /** Tests reading all rows from a split table with several key ranges. */
   @Test
   public void testReadingWithSplitsWithSeveralKeyRanges() throws Exception {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 101242)
    Time Spent: 6h 10m  (was: 6h)

> BigtableIO should merge splits if they exceed 15K
> -------------------------------------------------
>
>                 Key: BEAM-3246
>                 URL: https://issues.apache.org/jira/browse/BEAM-3246
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>            Reporter: Solomon Duskis
>            Assignee: Solomon Duskis
>            Priority: Major
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> A customer hit a problem with a large number of splits.  CloudBitableIO fixes 
> that here 
> https://github.com/GoogleCloudPlatform/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java#L241
> BigtableIO should have similar logic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to