[ 
https://issues.apache.org/jira/browse/BEAM-5272?focusedWorklogId=148879&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148879
 ]

ASF GitHub Bot logged work on BEAM-5272:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Sep/18 20:17
            Start Date: 27/Sep/18 20:17
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #6503: [BEAM-5272] Randomize 
the reduced splits in BigtableIO so that multiple workers may not hit the same 
tablet server
URL: https://github.com/apache/beam/pull/6503
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index edad185323c..755d889b491 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -848,18 +848,25 @@ protected BigtableSource withEstimatedSizeBytes(Long 
estimatedSizeBytes) {
       // Delegate to testable helper.
       List<BigtableSource> splits =
           splitBasedOnSamples(desiredBundleSizeBytes, 
getSampleRowKeys(options));
-      return reduceSplits(splits, options, MAX_SPLIT_COUNT);
+
+      // Reduce the splits.
+      List<BigtableSource> reduced = reduceSplits(splits, options, 
MAX_SPLIT_COUNT);
+      // Randomize the result before returning an immutable copy of the 
splits, the default behavior
+      // may lead to multiple workers hitting the same tablet.
+      Collections.shuffle(reduced);
+      return ImmutableList.copyOf(reduced);
     }
 
+    /** Returns a mutable list of reduced splits. */
     @VisibleForTesting
     protected List<BigtableSource> reduceSplits(
         List<BigtableSource> splits, PipelineOptions options, long 
maxSplitCounts)
         throws IOException {
       int numberToCombine = (int) ((splits.size() + maxSplitCounts - 1) / 
maxSplitCounts);
       if (splits.size() < maxSplitCounts || numberToCombine < 2) {
-        return splits;
+        return new ArrayList<>(splits);
       }
-      ImmutableList.Builder<BigtableSource> reducedSplits = 
ImmutableList.builder();
+      List<BigtableSource> reducedSplits = new ArrayList<>();
       List<ByteKeyRange> previousSourceRanges = new ArrayList<ByteKeyRange>();
       int counter = 0;
       long size = 0;
@@ -879,7 +886,7 @@ protected BigtableSource withEstimatedSizeBytes(Long 
estimatedSizeBytes) {
       if (size > 0) {
         reducedSplits.add(new BigtableSource(config, filter, 
previousSourceRanges, size));
       }
-      return reducedSplits.build();
+      return reducedSplits;
     }
 
     /**
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index cadb908be5a..54a2fee99b0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -102,7 +102,7 @@
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
-import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -736,10 +736,10 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() 
throws Exception {
             keyRanges,
             null /*size*/);
 
-    List<BigtableSource> splits =
-        source.split(numRows * bytesPerRow / numSamples, null /* options */);
-
-    assertThat(splits, hasSize(keyRanges.size()));
+    List<BigtableSource> splits = new ArrayList<>();
+    for (ByteKeyRange range : keyRanges) {
+      splits.add(source.withSingleRange(range));
+    }
 
     List<BigtableSource> reducedSplits = source.reduceSplits(splits, null, 
maxSplit);
 
@@ -753,7 +753,8 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() 
throws Exception {
 
     assertThat(
         actualRangesAfterSplit,
-        
IsIterableContainingInOrder.contains(expectedKeyRangesAfterReducedSplits.toArray()));
+        IsIterableContainingInAnyOrder.containsInAnyOrder(
+            expectedKeyRangesAfterReducedSplits.toArray()));
   }
 
   /** Tests reduce split with all non adjacent ranges. */
@@ -786,10 +787,10 @@ public void testReduceSplitsWithAllNonAdjacentRange() 
throws Exception {
             keyRanges,
             null /*size*/);
 
-    List<BigtableSource> splits =
-        source.split(numRows * bytesPerRow / numSamples, null /* options */);
-
-    assertThat(splits, hasSize(keyRanges.size()));
+    List<BigtableSource> splits = new ArrayList<>();
+    for (ByteKeyRange range : keyRanges) {
+      splits.add(source.withSingleRange(range));
+    }
 
     List<BigtableSource> reducedSplits = source.reduceSplits(splits, null, 
maxSplit);
 
@@ -801,8 +802,10 @@ public void testReduceSplitsWithAllNonAdjacentRange() 
throws Exception {
 
     assertAllSourcesHaveSingleRanges(reducedSplits);
 
-    //The expected split source ranges are exactly same as original
-    assertThat(actualRangesAfterSplit, 
IsIterableContainingInOrder.contains(keyRanges.toArray()));
+    // The expected split source ranges are exactly same as original
+    assertThat(
+        actualRangesAfterSplit,
+        
IsIterableContainingInAnyOrder.containsInAnyOrder(keyRanges.toArray()));
   }
 
   /** Tests reduce Splits with all adjacent ranges. */
@@ -826,10 +829,22 @@ public void tesReduceSplitsWithAdjacentRanges() throws 
Exception {
             Arrays.asList(ByteKeyRange.ALL_KEYS),
             null /*size*/);
 
-    List<BigtableSource> splits =
-        source.split(numRows * bytesPerRow / numSamples, null /* options */);
-
-    assertThat(splits, hasSize(numSamples));
+    List<BigtableSource> splits = new ArrayList<>();
+    List<ByteKeyRange> keyRanges =
+        Arrays.asList(
+            ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)),
+            ByteKeyRange.of(createByteKey(1), createByteKey(2)),
+            ByteKeyRange.of(createByteKey(2), createByteKey(3)),
+            ByteKeyRange.of(createByteKey(3), createByteKey(4)),
+            ByteKeyRange.of(createByteKey(4), createByteKey(5)),
+            ByteKeyRange.of(createByteKey(5), createByteKey(6)),
+            ByteKeyRange.of(createByteKey(6), createByteKey(7)),
+            ByteKeyRange.of(createByteKey(7), createByteKey(8)),
+            ByteKeyRange.of(createByteKey(8), createByteKey(9)),
+            ByteKeyRange.of(createByteKey(9), ByteKey.EMPTY));
+    for (ByteKeyRange range : keyRanges) {
+      splits.add(source.withSingleRange(range));
+    }
 
     //Splits Source have ranges 
[..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..]
     //expected reduced Split source ranges are [..4][4..8][8..]
@@ -849,7 +864,8 @@ public void tesReduceSplitsWithAdjacentRanges() throws 
Exception {
 
     assertThat(
         actualRangesAfterSplit,
-        
IsIterableContainingInOrder.contains(expectedKeyRangesAfterReducedSplits.toArray()));
+        IsIterableContainingInAnyOrder.containsInAnyOrder(
+            expectedKeyRangesAfterReducedSplits.toArray()));
     assertAllSourcesHaveSingleAdjacentRanges(reducedSplits);
     assertSourcesEqualReferenceSource(source, reducedSplits, null /* options 
*/);
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 148879)
    Time Spent: 1.5h  (was: 1h 20m)

> Randomize the reduced splits in BigtableIO so that multiple workers may not 
> hit the same tablet server
> ------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-5272
>                 URL: https://issues.apache.org/jira/browse/BEAM-5272
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Kevin Si
>            Assignee: Chamikara Jayalath
>            Priority: Minor
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Randomize the reduced splits in BigtableIO so that multiple workers may not 
> hit the same tablet server.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to