[ https://issues.apache.org/jira/browse/BEAM-5272?focusedWorklogId=148745&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-148745 ]
ASF GitHub Bot logged work on BEAM-5272: ---------------------------------------- Author: ASF GitHub Bot Created on: 27/Sep/18 13:43 Start Date: 27/Sep/18 13:43 Worklog Time Spent: 10m Work Description: kevinsi4508 closed pull request #6308: [BEAM-5272] Randomize the reduced splits in BigtableIO so that multiple workers may not hit the same tablet server URL: https://github.com/apache/beam/pull/6308 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index ae8fe7d04d9..cb5a174713e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -848,18 +848,27 @@ protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) { // Delegate to testable helper. List<BigtableSource> splits = splitBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options)); - return reduceSplits(splits, options, MAX_SPLIT_COUNT); + + // Reduce the splits. + List<BigtableSource> reduced = reduceSplits(splits, options, MAX_SPLIT_COUNT); + // Randomize the result before returning an immutable copy of the splits, the default behavior + // may lead to multiple workers hitting the same tablet. + Collections.shuffle(reduced); + return ImmutableList.copyOf(reduced); } + /** + * Returns a mutable list of reduced splits. + */ @VisibleForTesting protected List<BigtableSource> reduceSplits( List<BigtableSource> splits, PipelineOptions options, long maxSplitCounts) throws IOException { int numberToCombine = (int) ((splits.size() + maxSplitCounts - 1) / maxSplitCounts); if (splits.size() < maxSplitCounts || numberToCombine < 2) { - return splits; + return new ArrayList<>(splits); } - ImmutableList.Builder<BigtableSource> reducedSplits = ImmutableList.builder(); + List<BigtableSource> reducedSplits = new ArrayList<>(); List<ByteKeyRange> previousSourceRanges = new ArrayList<ByteKeyRange>(); int counter = 0; long size = 0; @@ -879,7 +888,7 @@ protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) { if (size > 0) { reducedSplits.add(new BigtableSource(config, filter, previousSourceRanges, size)); } - return reducedSplits.build(); + return reducedSplits; } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 47727e5b8a1..518dc104c4e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -100,7 +100,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; import org.hamcrest.Matchers; -import org.hamcrest.collection.IsIterableContainingInOrder; +import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -680,10 +680,10 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception { keyRanges, null /*size*/); - List<BigtableSource> splits = - source.split(numRows * bytesPerRow / numSamples, null /* options */); - - assertThat(splits, hasSize(keyRanges.size())); + List<BigtableSource> splits = new ArrayList<>(); + for (ByteKeyRange range : keyRanges) { + splits.add(source.withSingleRange(range)); + } List<BigtableSource> reducedSplits = source.reduceSplits(splits, null, maxSplit); @@ -697,7 +697,8 @@ public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception { assertThat( actualRangesAfterSplit, - IsIterableContainingInOrder.contains(expectedKeyRangesAfterReducedSplits.toArray())); + IsIterableContainingInAnyOrder.containsInAnyOrder( + expectedKeyRangesAfterReducedSplits.toArray())); } /** Tests reduce split with all non adjacent ranges. */ @@ -730,10 +731,10 @@ public void testReduceSplitsWithAllNonAdjacentRange() throws Exception { keyRanges, null /*size*/); - List<BigtableSource> splits = - source.split(numRows * bytesPerRow / numSamples, null /* options */); - - assertThat(splits, hasSize(keyRanges.size())); + List<BigtableSource> splits = new ArrayList<>(); + for (ByteKeyRange range : keyRanges) { + splits.add(source.withSingleRange(range)); + } List<BigtableSource> reducedSplits = source.reduceSplits(splits, null, maxSplit); @@ -745,8 +746,10 @@ public void testReduceSplitsWithAllNonAdjacentRange() throws Exception { assertAllSourcesHaveSingleRanges(reducedSplits); - //The expected split source ranges are exactly same as original - assertThat(actualRangesAfterSplit, IsIterableContainingInOrder.contains(keyRanges.toArray())); + // The expected split source ranges are exactly same as original + assertThat( + actualRangesAfterSplit, + IsIterableContainingInAnyOrder.containsInAnyOrder(keyRanges.toArray())); } /** Tests reduce Splits with all adjacent ranges. */ @@ -770,10 +773,22 @@ public void tesReduceSplitsWithAdjacentRanges() throws Exception { Arrays.asList(ByteKeyRange.ALL_KEYS), null /*size*/); - List<BigtableSource> splits = - source.split(numRows * bytesPerRow / numSamples, null /* options */); - - assertThat(splits, hasSize(numSamples)); + List<BigtableSource> splits = new ArrayList<>(); + List<ByteKeyRange> keyRanges = + Arrays.asList( + ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)), + ByteKeyRange.of(createByteKey(1), createByteKey(2)), + ByteKeyRange.of(createByteKey(2), createByteKey(3)), + ByteKeyRange.of(createByteKey(3), createByteKey(4)), + ByteKeyRange.of(createByteKey(4), createByteKey(5)), + ByteKeyRange.of(createByteKey(5), createByteKey(6)), + ByteKeyRange.of(createByteKey(6), createByteKey(7)), + ByteKeyRange.of(createByteKey(7), createByteKey(8)), + ByteKeyRange.of(createByteKey(8), createByteKey(9)), + ByteKeyRange.of(createByteKey(9), ByteKey.EMPTY)); + for (ByteKeyRange range : keyRanges) { + splits.add(source.withSingleRange(range)); + } //Splits Source have ranges [..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..] //expected reduced Split source ranges are [..4][4..8][8..] @@ -793,7 +808,8 @@ public void tesReduceSplitsWithAdjacentRanges() throws Exception { assertThat( actualRangesAfterSplit, - IsIterableContainingInOrder.contains(expectedKeyRangesAfterReducedSplits.toArray())); + IsIterableContainingInAnyOrder.containsInAnyOrder( + expectedKeyRangesAfterReducedSplits.toArray())); assertAllSourcesHaveSingleAdjacentRanges(reducedSplits); assertSourcesEqualReferenceSource(source, reducedSplits, null /* options */); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 148745) Time Spent: 1h 20m (was: 1h 10m) > Randomize the reduced splits in BigtableIO so that multiple workers may not > hit the same tablet server > ------------------------------------------------------------------------------------------------------ > > Key: BEAM-5272 > URL: https://issues.apache.org/jira/browse/BEAM-5272 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp > Reporter: Kevin Si > Assignee: Chamikara Jayalath > Priority: Minor > Time Spent: 1h 20m > Remaining Estimate: 0h > > Randomize the reduced splits in BigtableIO so that multiple workers may not > hit the same tablet server. -- This message was sent by Atlassian JIRA (v7.6.3#76005)