[BEAM-1272] Align the naming of "generateInitialSplits" and "splitIntoBundles" to better reflect their intention
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62464b5b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62464b5b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62464b5b Branch: refs/heads/master Commit: 62464b5bc8ff191dbbde2f3b1019742dea0287bc Parents: 4124cc6 Author: Etienne Chauchot <[email protected]> Authored: Thu Apr 13 10:55:20 2017 +0200 Committer: Dan Halperin <[email protected]> Committed: Tue Apr 18 12:01:27 2017 -0700 ---------------------------------------------------------------------- .../apex/translation/utils/ValuesSource.java | 2 +- .../apex/examples/UnboundedTextSource.java | 2 +- .../translation/GroupByKeyTranslatorTest.java | 2 +- .../apex/translation/utils/CollectionSource.java | 2 +- .../UnboundedReadFromBoundedSource.java | 7 ++++--- .../direct/BoundedReadEvaluatorFactory.java | 2 +- .../direct/UnboundedReadEvaluatorFactory.java | 2 +- .../direct/BoundedReadEvaluatorFactoryTest.java | 4 ++-- .../beam/runners/direct/DirectRunnerTest.java | 4 ++-- .../UnboundedReadEvaluatorFactoryTest.java | 2 +- .../translation/wrappers/SourceInputFormat.java | 3 ++- .../streaming/io/BoundedSourceWrapper.java | 2 +- .../streaming/io/UnboundedSocketSource.java | 2 +- .../streaming/io/UnboundedSourceWrapper.java | 2 +- .../flink/streaming/TestCountingSource.java | 2 +- .../runners/dataflow/internal/CustomSources.java | 2 +- .../beam/runners/spark/io/MicrobatchSource.java | 5 ++--- .../beam/runners/spark/io/SourceDStream.java | 2 +- .../apache/beam/runners/spark/io/SourceRDD.java | 4 ++-- .../sdk/io/BoundedReadFromUnboundedSource.java | 4 ++-- .../org/apache/beam/sdk/io/BoundedSource.java | 13 +++++++++++-- .../org/apache/beam/sdk/io/CountingSource.java | 2 +- .../org/apache/beam/sdk/io/FileBasedSource.java | 9 +++++---- .../apache/beam/sdk/io/OffsetBasedSource.java | 4 ++-- .../org/apache/beam/sdk/io/UnboundedSource.java | 2 +- .../apache/beam/sdk/testing/SourceTestUtils.java | 6 +++--- .../org/apache/beam/sdk/io/AvroSourceTest.java | 10 +++++----- .../apache/beam/sdk/io/CountingSourceTest.java | 6 +++--- .../apache/beam/sdk/io/FileBasedSourceTest.java | 8 ++++---- .../beam/sdk/io/OffsetBasedSourceTest.java | 8 ++++---- .../java/org/apache/beam/sdk/io/ReadTest.java | 4 ++-- .../java/org/apache/beam/sdk/io/TextIOTest.java | 16 ++++++++-------- .../org/apache/beam/sdk/io/XmlSourceTest.java | 8 ++++---- .../sdk/runners/dataflow/TestCountingSource.java | 2 +- .../beam/sdk/testing/SourceTestUtilsTest.java | 2 +- .../apache/beam/sdk/transforms/CreateTest.java | 12 ++++++------ .../sdk/io/elasticsearch/ElasticsearchIO.java | 2 +- .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 2 +- .../io/elasticsearch/ElasticsearchIOTest.java | 4 ++-- .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 2 +- .../sdk/io/gcp/bigquery/TransformingSource.java | 4 ++-- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 6 +++--- .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 6 +++--- .../beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 10 +++++----- .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java | 8 +++++--- .../io/gcp/pubsub/PubsubUnboundedSourceTest.java | 4 ++-- .../hadoop/inputformat/HadoopInputFormatIO.java | 4 ++-- .../inputformat/HadoopInputFormatIOTest.java | 19 +++++++++++-------- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 7 +++---- .../apache/beam/sdk/io/hbase/HBaseIOTest.java | 2 +- .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 6 +++--- .../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 4 ++-- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 2 +- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 4 ++-- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 12 ++++++------ .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 8 ++++---- .../beam/sdk/io/kinesis/KinesisSource.java | 2 +- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 4 ++-- .../apache/beam/sdk/io/mongodb/MongoDbIO.java | 2 +- .../beam/sdk/io/mongodb/MongoDBGridFSIOTest.java | 2 +- .../java/org/apache/beam/sdk/io/mqtt/MqttIO.java | 2 +- 61 files changed, 156 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java index 8526618..62c92a0 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java @@ -55,7 +55,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi } @Override - public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits( + public java.util.List<? extends UnboundedSource<T, CheckpointMark>> split( int desiredNumSplits, PipelineOptions options) throws Exception { return Collections.singletonList(this); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java index 8132ee5..abe97f6 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java @@ -39,7 +39,7 @@ public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource private static final long serialVersionUID = 1L; @Override - public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits( + public List<? extends UnboundedSource<String, CheckpointMark>> split( int desiredNumSplits, PipelineOptions options) throws Exception { return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java index 96963a0..193de71 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java @@ -131,7 +131,7 @@ public class GroupByKeyTranslatorTest { } @Override - public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits( + public List<? extends UnboundedSource<String, CheckpointMark>> split( int desiredNumSplits, PipelineOptions options) throws Exception { return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java index c3b35f9..92812b4 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java @@ -47,7 +47,7 @@ public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.Chec } @Override - public List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits( + public List<? extends UnboundedSource<T, CheckpointMark>> split( int desiredNumSplits, PipelineOptions options) throws Exception { return Collections.singletonList(this); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java index 6b7bd71..f67af8a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java @@ -61,7 +61,8 @@ import org.slf4j.LoggerFactory; /** * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}. * - * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles}, + * <p>{@link BoundedSource} is read directly without calling + * {@link BoundedSource#split}, * and element timestamps are propagated. While any elements remain, the watermark is the beginning * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. @@ -130,7 +131,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle } @Override - public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits( + public List<BoundedToUnboundedSourceAdapter<T>> split( int desiredNumSplits, PipelineOptions options) throws Exception { try { long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; @@ -140,7 +141,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle return ImmutableList.of(this); } List<? extends BoundedSource<T>> splits = - boundedSource.splitIntoBundles(desiredBundleSize, options); + boundedSource.split(desiredBundleSize, options); if (splits == null) { LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource); return ImmutableList.of(this); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 5bd6f7e..0c2afe8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -196,7 +196,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { long estimatedBytes = source.getEstimatedSizeBytes(options); long bytesPerBundle = estimatedBytes / targetParallelism; List<? extends BoundedSource<T>> bundles = - source.splitIntoBundles(bytesPerBundle, options); + source.split(bytesPerBundle, options); ImmutableList.Builder<CommittedBundle<BoundedSourceShard<T>>> shards = ImmutableList.builder(); for (BoundedSource<T> bundle : bundles) { http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 91e7248..d3609f8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -301,7 +301,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { throws Exception { UnboundedSource<OutputT, ?> source = transform.getTransform().getSource(); List<? extends UnboundedSource<OutputT, ?>> splits = - source.generateInitialSplits(targetParallelism, evaluationContext.getPipelineOptions()); + source.split(targetParallelism, evaluationContext.getPipelineOptions()); UnboundedReadDeduplicator deduplicator = source.requiresDeduping() ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index 8361bdc..2b5b46d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -265,7 +265,7 @@ public class BoundedReadEvaluatorFactoryTest { public void boundedSourceInMemoryTransformEvaluatorShardsOfSource() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); List<? extends BoundedSource<Long>> splits = - source.splitIntoBundles(source.getEstimatedSizeBytes(options) / 2, options); + source.split(source.getEstimatedSizeBytes(options) / 2, options); UncommittedBundle<BoundedSourceShard<Long>> rootBundle = bundleFactory.createRootBundle(); for (BoundedSource<Long> split : splits) { @@ -365,7 +365,7 @@ public class BoundedReadEvaluatorFactoryTest { } @Override - public List<? extends OffsetBasedSource<T>> splitIntoBundles( + public List<? extends OffsetBasedSource<T>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { return ImmutableList.of(this); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 3b81f4d..ed19be2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -549,13 +549,13 @@ public class DirectRunnerTest implements Serializable { } @Override - public List<? extends BoundedSource<T>> splitIntoBundles( + public List<? extends BoundedSource<T>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { // Must have more than checkState( desiredBundleSizeBytes < getEstimatedSizeBytes(options), "Must split into more than one source"); - return underlying.splitIntoBundles(desiredBundleSizeBytes, options); + return underlying.split(desiredBundleSizeBytes, options); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 8707f31..567ee98 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -450,7 +450,7 @@ public class UnboundedReadEvaluatorFactoryTest { } @Override - public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSplits( + public List<? extends UnboundedSource<T, TestCheckpointMark>> split( int desiredNumSplits, PipelineOptions options) throws Exception { return ImmutableList.of(this); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 443378f..a87472b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -100,7 +100,8 @@ public class SourceInputFormat<T> public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException { try { long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits; - List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes, options); + List<? extends Source<T>> shards = + initialSource.split(desiredSizeBytes, options); int numShards = shards.size(); SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards]; for (int i = 0; i < numShards; i++) { http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java index 820a9bd..2ed5024 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java @@ -76,7 +76,7 @@ public class BoundedSourceWrapper<OutputT> // get the splits early. we assume that the generated splits are stable, // this is necessary so that the mapping of state to source is correct // when restoring - splitSources = source.splitIntoBundles(desiredBundleSize, pipelineOptions); + splitSources = source.split(desiredBundleSize, pipelineOptions); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java index ed03dda..910a33f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java @@ -94,7 +94,7 @@ public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.Check } @Override - public List<? extends UnboundedSource<String, CheckpointMarkT>> generateInitialSplits( + public List<? extends UnboundedSource<String, CheckpointMarkT>> split( int desiredNumSplits, PipelineOptions options) throws Exception { return Collections.<UnboundedSource<String, CheckpointMarkT>>singletonList(this); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 2849464..bb9b58a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -156,7 +156,7 @@ public class UnboundedSourceWrapper< // get the splits early. we assume that the generated splits are stable, // this is necessary so that the mapping of state to source is correct // when restoring - splitSources = source.generateInitialSplits(parallelism, pipelineOptions); + splitSources = source.split(parallelism, pipelineOptions); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java index 9251d42..3a08088 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java @@ -104,7 +104,7 @@ public class TestCountingSource } @Override - public List<TestCountingSource> generateInitialSplits( + public List<TestCountingSource> split( int desiredNumSplits, PipelineOptions options) { List<TestCountingSource> splits = new ArrayList<>(); int numSplits = allowSplitting ? desiredNumSplits : 1; http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java index ffbf153..778ccf3 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java @@ -98,7 +98,7 @@ public class CustomSources { int desiredNumSplits = getDesiredNumUnboundedSourceSplits(options.as(DataflowPipelineOptions.class)); for (UnboundedSource<?, ?> split : - unboundedSource.generateInitialSplits(desiredNumSplits, options)) { + unboundedSource.split(desiredNumSplits, options)) { encodedSplits.add(encodeBase64String(serializeToByteArray(split))); } checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split"); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index 847de19..7c07920 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -102,12 +102,11 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo } @Override - public List<? extends BoundedSource<T>> - splitIntoBundles(long desiredBundleSizeBytes, + public List<? extends BoundedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions options) throws Exception { List<MicrobatchSource<T, CheckpointMarkT>> result = new ArrayList<>(); List<? extends UnboundedSource<T, CheckpointMarkT>> splits = - source.generateInitialSplits(numInitialSplits, options); + source.split(numInitialSplits, options); int numSplits = splits.size(); long[] numRecords = splitNumRecords(maxNumRecords, numSplits); for (int i = 0; i < numSplits; i++) { http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index fb6da97..d33529c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -104,7 +104,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> try { this.numPartitions = createMicrobatchSource() - .splitIntoBundles(initialParallelism, options) + .split(initialParallelism, options) .size(); } catch (Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java index 2f9a827..b99ae10 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java @@ -105,7 +105,7 @@ public class SourceRDD { + "size of {} bytes.", source, DEFAULT_BUNDLE_SIZE); } try { - List<? extends Source<T>> partitionedSources = source.splitIntoBundles(desiredSizeBytes, + List<? extends Source<T>> partitionedSources = source.split(desiredSizeBytes, runtimeContext.getPipelineOptions()); Partition[] partitions = new SourcePartition[partitionedSources.size()]; for (int i = 0; i < partitionedSources.size(); i++) { @@ -258,7 +258,7 @@ public class SourceRDD { @Override public Partition[] getPartitions() { try { - List<? extends Source<T>> partitionedSources = microbatchSource.splitIntoBundles( + List<? extends Source<T>> partitionedSources = microbatchSource.split( -1 /* ignored */, runtimeContext.getPipelineOptions()); Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()]; for (int i = 0; i < partitionedSources.size(); i++) { http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index d7f1d7b..e54176f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -186,12 +186,12 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle } @Override - public List<? extends BoundedSource<ValueWithRecordId<T>>> splitIntoBundles( + public List<? extends BoundedSource<ValueWithRecordId<T>>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { List<UnboundedToBoundedSourceAdapter<T>> result = new ArrayList<>(); int numInitialSplits = numInitialSplits(getMaxNumRecords()); List<? extends UnboundedSource<T, ?>> splits = - getSource().generateInitialSplits(numInitialSplits, options); + getSource().split(numInitialSplits, options); int numSplits = splits.size(); long[] numRecords = splitNumRecords(getMaxNumRecords(), numSplits); for (int i = 0; i < numSplits; i++) { http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java index cd389e8..0b19aa2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java @@ -34,7 +34,7 @@ import org.joda.time.Instant; * * <p>The operations are: * <ul> - * <li>Splitting into bundles of given size: {@link #splitIntoBundles}; + * <li>Splitting into sources that read bundles of given size: {@link #split}; * <li>Size estimation: {@link #getEstimatedSizeBytes}; * <li>The accompanying {@link BoundedReader reader} has additional functionality to enable runners * to dynamically adapt based on runtime conditions. @@ -54,10 +54,19 @@ public abstract class BoundedSource<T> extends Source<T> { /** * Splits the source into bundles of approximately {@code desiredBundleSizeBytes}. */ - public abstract List<? extends BoundedSource<T>> splitIntoBundles( + public abstract List<? extends BoundedSource<T>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception; /** + * {@link BoundedSource#split(long, PipelineOptions)} old method name to be used with Dataflow. + */ + @Deprecated + public List<? extends BoundedSource<T>> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception{ + return split(desiredBundleSizeBytes, options); + } + + /** * An estimate of the total size (in bytes) of the data that would be read from this source. * This estimate is in terms of external storage size, before any decompression or other * processing done by the reader. http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java index 4d1305c..73b663d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java @@ -326,7 +326,7 @@ public class CountingSource { * {@code [2, 8, 14, ...)}, and {@code [4, 10, 16, ...)}. */ @Override - public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> generateInitialSplits( + public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> split( int desiredNumSplits, PipelineOptions options) throws Exception { // Using Javadoc example, stride 2 with 3 splits becomes stride 6. long newStride = stride * desiredNumSplits; http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index f38743a..95e6078 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -324,15 +324,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { @Override public List<? extends FileBasedSource<T>> call() throws Exception { return createForSubrangeOfFile(file, 0, Long.MAX_VALUE) - .splitIntoBundles(desiredBundleSizeBytes, options); + .split(desiredBundleSizeBytes, options); } }); } @Override - public final List<? extends FileBasedSource<T>> splitIntoBundles( + public final List<? extends FileBasedSource<T>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - // This implementation of method splitIntoBundles is provided to simplify subclasses. Here we + // This implementation of method split is provided to simplify subclasses. Here we // split a FileBasedSource based on a file pattern to FileBasedSources based on full single // files. For files that can be efficiently seeked, we further split FileBasedSources based on // those files to FileBasedSources based on sub ranges of single files. @@ -370,7 +370,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { } else { if (isSplittable()) { List<FileBasedSource<T>> splitResults = new ArrayList<>(); - for (OffsetBasedSource<T> split : super.splitIntoBundles(desiredBundleSizeBytes, options)) { + for (OffsetBasedSource<T> split : + super.split(desiredBundleSizeBytes, options)) { splitResults.add((FileBasedSource<T>) split); } return splitResults; http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java index e9a398d..05f0d97 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java @@ -108,7 +108,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { } @Override - public List<? extends OffsetBasedSource<T>> splitIntoBundles( + public List<? extends OffsetBasedSource<T>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { // Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to // make sure that we do not end up with a too small bundle at the end. If the desired bundle @@ -163,7 +163,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { /** * Returns approximately how many bytes of data correspond to a single offset in this source. * Used for translation between this source's range and methods defined in terms of bytes, such - * as {@link #getEstimatedSizeBytes} and {@link #splitIntoBundles}. + * as {@link #getEstimatedSizeBytes} and {@link #split}. * * <p>Defaults to {@code 1} byte, which is the common case for, e.g., file sources. */ http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java index cc1f598..af6a8cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java @@ -65,7 +65,7 @@ public abstract class UnboundedSource< * as possible, but does not have to match exactly. A low number of splits * will limit the amount of parallelism in the source. */ - public abstract List<? extends UnboundedSource<OutputT, CheckpointMarkT>> generateInitialSplits( + public abstract List<? extends UnboundedSource<OutputT, CheckpointMarkT>> split( int desiredNumSplits, PipelineOptions options) throws Exception; /** http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java index a2a33f3..2ab5b35 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java @@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory; * amount of test coverage with few code. Most notable ones are: * <ul> * <li>{@link #assertSourcesEqualReferenceSource} helps testing that the data read - * by the union of sources produced by {@link BoundedSource#splitIntoBundles} + * by the union of sources produced by {@link BoundedSource#split} * is the same as data read by the original source. * <li>If your source implements dynamic work rebalancing, use the * {@code assertSplitAtFraction} family of functions - they test behavior of @@ -685,7 +685,7 @@ public class SourceTestUtils { * * <p>It forwards most methods to the given {@code boundedSource}, except: * <ol> - * <li> {@link BoundedSource#splitIntoBundles} rejects initial splitting + * <li> {@link BoundedSource#split} rejects initial splitting * by returning itself in a list. * <li> {@link BoundedReader#splitAtFraction} rejects dynamic splitting by returning null. * </ol> @@ -708,7 +708,7 @@ public class SourceTestUtils { } @Override - public List<? extends BoundedSource<T>> splitIntoBundles( + public List<? extends BoundedSource<T>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { return ImmutableList.of(this); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java index fb7b27d..78485c7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java @@ -168,7 +168,7 @@ public class AvroSourceTest { AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class); List<? extends BoundedSource<FixedRecord>> splits = - source.splitIntoBundles(file.length() / 3, null); + source.split(file.length() / 3, null); for (BoundedSource<FixedRecord> subSource : splits) { int items = SourceTestUtils.readFromSource(subSource, null).size(); // Shouldn't split while unstarted. @@ -201,7 +201,7 @@ public class AvroSourceTest { } List<? extends BoundedSource<FixedRecord>> splits = - source.splitIntoBundles(file.length() / 3, null); + source.split(file.length() / 3, null); for (BoundedSource<FixedRecord> subSource : splits) { try (BoundedSource.BoundedReader<FixedRecord> reader = subSource.createReader(null)) { assertEquals(Double.valueOf(0.0), reader.getFractionConsumed()); @@ -339,7 +339,7 @@ public class AvroSourceTest { int nonEmptySplits; // Split with the minimum bundle size - splits = source.splitIntoBundles(100L, options); + splits = source.split(100L, options); assertTrue(splits.size() > 2); SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); nonEmptySplits = 0; @@ -351,7 +351,7 @@ public class AvroSourceTest { assertTrue(nonEmptySplits > 2); // Split with larger bundle size - splits = source.splitIntoBundles(file.length() / 4, options); + splits = source.split(file.length() / 4, options); assertTrue(splits.size() > 2); SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); nonEmptySplits = 0; @@ -363,7 +363,7 @@ public class AvroSourceTest { assertTrue(nonEmptySplits > 2); // Split with the file length - splits = source.splitIntoBundles(file.length(), options); + splits = source.split(file.length(), options); assertTrue(splits.size() == 1); SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java index 0e3b07e..8807164 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java @@ -110,7 +110,7 @@ public class CountingSourceTest { BoundedSource<Long> initial = CountingSource.upTo(numElements); List<? extends BoundedSource<Long>> splits = - initial.splitIntoBundles(splitSizeBytes, p.getOptions()); + initial.split(splitSizeBytes, p.getOptions()); assertEquals("Expected exact splitting", numSplits, splits.size()); // Assemble all the splits into one flattened PCollection, also verify their sizes. @@ -234,7 +234,7 @@ public class CountingSourceTest { UnboundedSource<Long, ?> initial = CountingSource.unbounded(); List<? extends UnboundedSource<Long, ?>> splits = - initial.generateInitialSplits(numSplits, p.getOptions()); + initial.split(numSplits, p.getOptions()); assertEquals("Expected exact splitting", numSplits, splits.size()); long elementsPerSplit = numElements / numSplits; @@ -262,7 +262,7 @@ public class CountingSourceTest { UnboundedCountingSource initial = CountingSource.createUnbounded().withRate(elementsPerPeriod, period); List<? extends UnboundedSource<Long, ?>> splits = - initial.generateInitialSplits(numSplits, p.getOptions()); + initial.split(numSplits, p.getOptions()); assertEquals("Expected exact splitting", numSplits, splits.size()); long elementsPerSplit = numElements / numSplits; http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java index a889305..94a29da 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java @@ -410,7 +410,7 @@ public class FileBasedSourceTest { TestFileBasedSource source = new TestFileBasedSource(file0.getParent() + "/" + "file*", Long.MAX_VALUE, null); - List<? extends BoundedSource<String>> splits = source.splitIntoBundles(Long.MAX_VALUE, null); + List<? extends BoundedSource<String>> splits = source.split(Long.MAX_VALUE, null); assertEquals(numFiles, splits.size()); } @@ -421,7 +421,7 @@ public class FileBasedSourceTest { TestFileBasedSource source = new TestFileBasedSource(missingFilePath, Long.MAX_VALUE, null); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(String.format("Unable to find any files matching %s", missingFilePath)); - source.splitIntoBundles(1234, options); + source.split(1234, options); } @Test @@ -698,7 +698,7 @@ public class FileBasedSourceTest { TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 16, null); - List<? extends BoundedSource<String>> sources = source.splitIntoBundles(32, null); + List<? extends BoundedSource<String>> sources = source.split(32, null); // Not a trivial split. assertTrue(sources.size() > 1); @@ -877,7 +877,7 @@ public class FileBasedSourceTest { TestFileBasedSource source = new TestFileBasedSource(new File(file1.getParent(), "file*").getPath(), 64, null); - List<? extends BoundedSource<String>> sources = source.splitIntoBundles(512, null); + List<? extends BoundedSource<String>> sources = source.split(512, null); // Not a trivial split. assertTrue(sources.size() > 1); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java index a300a9a..25168a3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java @@ -147,7 +147,7 @@ public class OffsetBasedSourceTest { CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1); long[] boundaries = {0, 150, 300, 450, 600, 750, 900, 1000}; assertSplitsAre( - testSource.splitIntoBundles(150 * testSource.getBytesPerOffset(), null), + testSource.split(150 * testSource.getBytesPerOffset(), null), boundaries); } @@ -159,7 +159,7 @@ public class OffsetBasedSourceTest { CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1); long[] boundaries = {300, 450, 600, 750, 900, 1000}; assertSplitsAre( - testSource.splitIntoBundles(150 * testSource.getBytesPerOffset(), null), + testSource.split(150 * testSource.getBytesPerOffset(), null), boundaries); } @@ -182,7 +182,7 @@ public class OffsetBasedSourceTest { CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1); long[] boundaries = {300, 450, 600, 750, 1000}; assertSplitsAre( - testSource.splitIntoBundles(100 * testSource.getBytesPerOffset(), null), + testSource.split(100 * testSource.getBytesPerOffset(), null), boundaries); } @@ -195,7 +195,7 @@ public class OffsetBasedSourceTest { // Last 10 bytes should collapse to the previous bundle. long[] boundaries = {0, 110, 220, 330, 440, 550, 660, 770, 880, 1000}; assertSplitsAre( - testSource.splitIntoBundles(110 * testSource.getBytesPerOffset(), null), + testSource.split(110 * testSource.getBytesPerOffset(), null), boundaries); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java index 416a086..74acf18 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java @@ -152,7 +152,7 @@ public class ReadTest implements Serializable{ private abstract static class CustomBoundedSource extends BoundedSource<String> { @Override - public List<? extends BoundedSource<String>> splitIntoBundles( + public List<? extends BoundedSource<String>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { return null; } @@ -186,7 +186,7 @@ public class ReadTest implements Serializable{ private abstract static class CustomUnboundedSource extends UnboundedSource<String, NoOpCheckpointMark> { @Override - public List<? extends UnboundedSource<String, NoOpCheckpointMark>> generateInitialSplits( + public List<? extends UnboundedSource<String, NoOpCheckpointMark>> split( int desiredNumSplits, PipelineOptions options) throws Exception { return null; } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 2e36273..3b6992a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -1118,7 +1118,7 @@ public class TextIOTest { } @Test - public void testInitialSplitIntoBundlesAutoModeTxt() throws Exception { + public void testInitialSplitAutoModeTxt() throws Exception { PipelineOptions options = TestPipeline.testingPipelineOptions(); long desiredBundleSize = 1000; @@ -1127,7 +1127,7 @@ public class TextIOTest { FileBasedSource<String> source = TextIO.Read.from(largeTxt.getPath()).getSource(); List<? extends FileBasedSource<String>> splits = - source.splitIntoBundles(desiredBundleSize, options); + source.split(desiredBundleSize, options); // At least 2 splits and they are equal to reading the whole file. assertThat(splits, hasSize(greaterThan(1))); @@ -1135,7 +1135,7 @@ public class TextIOTest { } @Test - public void testInitialSplitIntoBundlesAutoModeGz() throws Exception { + public void testInitialSplitAutoModeGz() throws Exception { long desiredBundleSize = 1000; PipelineOptions options = TestPipeline.testingPipelineOptions(); @@ -1144,7 +1144,7 @@ public class TextIOTest { FileBasedSource<String> source = TextIO.Read.from(largeGz.getPath()).getSource(); List<? extends FileBasedSource<String>> splits = - source.splitIntoBundles(desiredBundleSize, options); + source.split(desiredBundleSize, options); // Exactly 1 split, even in AUTO mode, since it is a gzip file. assertThat(splits, hasSize(equalTo(1))); @@ -1152,7 +1152,7 @@ public class TextIOTest { } @Test - public void testInitialSplitIntoBundlesGzipModeTxt() throws Exception { + public void testInitialSplitGzipModeTxt() throws Exception { PipelineOptions options = TestPipeline.testingPipelineOptions(); long desiredBundleSize = 1000; @@ -1162,7 +1162,7 @@ public class TextIOTest { FileBasedSource<String> source = TextIO.Read.from(largeTxt.getPath()).withCompressionType(GZIP).getSource(); List<? extends FileBasedSource<String>> splits = - source.splitIntoBundles(desiredBundleSize, options); + source.split(desiredBundleSize, options); // Exactly 1 split, even though splittable text file, since using GZIP mode. assertThat(splits, hasSize(equalTo(1))); @@ -1170,7 +1170,7 @@ public class TextIOTest { } @Test - public void testInitialSplitIntoBundlesGzipModeGz() throws Exception { + public void testInitialSplitGzipModeGz() throws Exception { PipelineOptions options = TestPipeline.testingPipelineOptions(); long desiredBundleSize = 1000; @@ -1180,7 +1180,7 @@ public class TextIOTest { FileBasedSource<String> source = TextIO.Read.from(largeGz.getPath()).withCompressionType(GZIP).getSource(); List<? extends FileBasedSource<String>> splits = - source.splitIntoBundles(desiredBundleSize, options); + source.split(desiredBundleSize, options); // Exactly 1 split using .gz extension and using GZIP mode. assertThat(splits, hasSize(equalTo(1))); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java index d6898d5..5f71f30 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java @@ -363,7 +363,7 @@ public class XmlSourceTest { .withRecordElement("train") .withRecordClass(Train.class) .withMinBundleSize(10); - List<? extends FileBasedSource<Train>> splits = source.splitIntoBundles(50, null); + List<? extends FileBasedSource<Train>> splits = source.split(50, null); assertTrue(splits.size() > 2); @@ -686,7 +686,7 @@ public class XmlSourceTest { .withRecordElement("train") .withRecordClass(Train.class) .withMinBundleSize(10); - List<? extends FileBasedSource<Train>> splits = source.splitIntoBundles(100, null); + List<? extends FileBasedSource<Train>> splits = source.split(100, null); assertTrue(splits.size() > 2); @@ -710,7 +710,7 @@ public class XmlSourceTest { .withRecordElement("train") .withRecordClass(Train.class) .withMinBundleSize(10); - List<? extends FileBasedSource<Train>> splits = source.splitIntoBundles(256, null); + List<? extends FileBasedSource<Train>> splits = source.split(256, null); // Not a trivial split assertTrue(splits.size() > 2); @@ -737,7 +737,7 @@ public class XmlSourceTest { .withMinBundleSize(10); List<? extends FileBasedSource<Train>> splits = - fileSource.splitIntoBundles(file.length() / 3, null); + fileSource.split(file.length() / 3, null); for (BoundedSource<Train> splitSource : splits) { int numItems = readEverythingFromReader(splitSource.createReader(null)).size(); // Should not split while unstarted. http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java index b53d1fc..9fcc3c5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java @@ -104,7 +104,7 @@ public class TestCountingSource } @Override - public List<TestCountingSource> generateInitialSplits( + public List<TestCountingSource> split( int desiredNumSplits, PipelineOptions options) { List<TestCountingSource> splits = new ArrayList<>(); int numSplits = allowSplitting ? desiredNumSplits : 1; http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java index efb385d..62114b0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java @@ -43,7 +43,7 @@ public class SourceTestUtilsTest { PipelineOptions options = PipelineOptionsFactory.create(); BoundedSource<Long> baseSource = CountingSource.upTo(100); BoundedSource<Long> unsplittableSource = SourceTestUtils.toUnsplittableSource(baseSource); - List<?> splits = unsplittableSource.splitIntoBundles(1, options); + List<?> splits = unsplittableSource.split(1, options); assertEquals(splits.size(), 1); assertEquals(splits.get(0), unsplittableSource); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index 43e4463..8a30476 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -406,32 +406,32 @@ public class CreateTest { } @Test - public void testSourceSplitIntoBundles() throws Exception { + public void testSourceSplit() throws Exception { CreateSource<Integer> source = CreateSource.fromIterable( ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of()); PipelineOptions options = PipelineOptionsFactory.create(); - List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options); + List<? extends BoundedSource<Integer>> splitSources = source.split(12, options); assertThat(splitSources, hasSize(3)); SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options); } @Test - public void testSourceSplitIntoBundlesVoid() throws Exception { + public void testSourceSplitVoid() throws Exception { CreateSource<Void> source = CreateSource.fromIterable( Lists.<Void>newArrayList(null, null, null, null, null), VoidCoder.of()); PipelineOptions options = PipelineOptionsFactory.create(); - List<? extends BoundedSource<Void>> splitSources = source.splitIntoBundles(3, options); + List<? extends BoundedSource<Void>> splitSources = source.split(3, options); SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options); } @Test - public void testSourceSplitIntoBundlesEmpty() throws Exception { + public void testSourceSplitEmpty() throws Exception { CreateSource<Integer> source = CreateSource.fromIterable(ImmutableList.<Integer>of(), BigEndianIntegerCoder.of()); PipelineOptions options = PipelineOptionsFactory.create(); - List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options); + List<? extends BoundedSource<Integer>> splitSources = source.split(12, options); SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index baf0cc2..8e138ef 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -428,7 +428,7 @@ public class ElasticsearchIO { } @Override - public List<? extends BoundedSource<String>> splitIntoBundles( + public List<? extends BoundedSource<String>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { List<BoundedElasticsearchSource> sources = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java index b5fec17..d968bc2 100644 --- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -88,7 +88,7 @@ public class ElasticsearchIOIT { // as many bundles as ES shards and bundle size is shard size long desiredBundleSizeBytes = 0; List<? extends BoundedSource<String>> splits = - initialSource.splitIntoBundles(desiredBundleSizeBytes, options); + initialSource.split(desiredBundleSizeBytes, options); SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options); //this is the number of ES shards // (By default, each index in Elasticsearch is allocated 5 primary shards) http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index bca0fe8..260af79 100644 --- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -326,7 +326,7 @@ public class ElasticsearchIOTest implements Serializable { } @Test - public void testSplitIntoBundles() throws Exception { + public void testSplit() throws Exception { ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); PipelineOptions options = PipelineOptionsFactory.create(); ElasticsearchIO.Read read = @@ -336,7 +336,7 @@ public class ElasticsearchIOTest implements Serializable { // as many bundles as ES shards and bundle size is shard size int desiredBundleSizeBytes = 0; List<? extends BoundedSource<String>> splits = - initialSource.splitIntoBundles(desiredBundleSizeBytes, options); + initialSource.split(desiredBundleSizeBytes, options); SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options); //this is the number of ES shards // (By default, each index in Elasticsearch is allocated 5 primary shards) http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 746258f..1b90dc3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -81,7 +81,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> { } @Override - public List<BoundedSource<TableRow>> splitIntoBundles( + public List<BoundedSource<TableRow>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); TableReference tableToExtract = getTableToExtract(bqOptions); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java index f7d8252..b8e6b39 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java @@ -52,10 +52,10 @@ class TransformingSource<T, V> extends BoundedSource<V> { } @Override - public List<? extends BoundedSource<V>> splitIntoBundles( + public List<? extends BoundedSource<V>> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { return Lists.transform( - boundedSource.splitIntoBundles(desiredBundleSizeBytes, options), + boundedSource.split(desiredBundleSizeBytes, options), new Function<BoundedSource<T>, BoundedSource<V>>() { @Override public BoundedSource<V> apply(BoundedSource<T> input) { http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 89c67a4..28f8878 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -724,7 +724,7 @@ public class BigtableIO { } @Override - public List<BigtableSource> splitIntoBundles( + public List<BigtableSource> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { // Update the desiredBundleSizeBytes in order to limit the // number of splits to maximumNumberOfSplits. @@ -734,11 +734,11 @@ public class BigtableIO { Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes); // Delegate to testable helper. - return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options)); + return splitBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options)); } /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */ - private List<BigtableSource> splitIntoBundlesBasedOnSamples( + private List<BigtableSource> splitBasedOnSamples( long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) { // There are no regions, or no samples available. Just scan the entire range. if (sampleRowKeys.isEmpty()) { http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 9d8763b..0389d4b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -1119,7 +1119,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> } @Override - public List<PubsubSource<T>> generateInitialSplits( + public List<PubsubSource<T>> split( int desiredNumSplits, PipelineOptions options) throws Exception { List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits); PubsubSource<T> splitSource = this; @@ -1142,8 +1142,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> SubscriptionPath subscription = subscriptionPath; if (subscription == null) { if (checkpoint == null) { - // This reader has never been started and there was no call to #splitIntoBundles; create - // a single random subscription, which will be kept in the checkpoint. + // This reader has never been started and there was no call to #split; + // create a single random subscription, which will be kept in the checkpoint. subscription = outer.createRandomSubscription(options); } else { subscription = checkpoint.getSubscription(); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 2a2bf91..83fd8d9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -1756,7 +1756,7 @@ public class BigQueryIOTest implements Serializable { SourceTestUtils.assertSplitAtFractionBehavior( bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); - List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options); + List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options); assertEquals(1, sources.size()); BoundedSource<TableRow> actual = sources.get(0); assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); @@ -1835,7 +1835,7 @@ public class BigQueryIOTest implements Serializable { SourceTestUtils.assertSplitAtFractionBehavior( bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); - List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options); + List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options); assertEquals(1, sources.size()); BoundedSource<TableRow> actual = sources.get(0); assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); @@ -1917,7 +1917,7 @@ public class BigQueryIOTest implements Serializable { SourceTestUtils.assertSplitAtFractionBehavior( bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); - List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options); + List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options); assertEquals(1, sources.size()); BoundedSource<TableRow> actual = sources.get(0); assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); @@ -1963,7 +1963,7 @@ public class BigQueryIOTest implements Serializable { stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options); SourceTestUtils.assertSourcesEqualReferenceSource( - stringSource, stringSource.splitIntoBundles(100, options), options); + stringSource, stringSource.split(100, options), options); } @Test @@ -1994,7 +1994,7 @@ public class BigQueryIOTest implements Serializable { stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); SourceTestUtils.assertSourcesEqualReferenceSource( - stringSource, stringSource.splitIntoBundles(100, options), options); + stringSource, stringSource.split(100, options), options); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 1c770a2..3653753 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -476,7 +476,7 @@ public class BigtableIOTest { ByteKeyRange.ALL_KEYS, null /*size*/); List<BigtableSource> splits = - source.splitIntoBundles(numRows * bytesPerRow / numSamples, null /* options */); + source.split(numRows * bytesPerRow / numSamples, null /* options */); // Test num splits and split equality. assertThat(splits, hasSize(numSamples)); @@ -503,7 +503,8 @@ public class BigtableIOTest { null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/); - List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null); + List<BigtableSource> splits = + source.split(numRows * bytesPerRow / numSplits, null); // Test num splits and split equality. assertThat(splits, hasSize(numSplits)); @@ -528,7 +529,8 @@ public class BigtableIOTest { RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build(); BigtableSource source = new BigtableSource(serviceFactory, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/); - List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null); + List<BigtableSource> splits = + source.split(numRows * bytesPerRow / numSplits, null); // Test num splits and split equality. assertThat(splits, hasSize(numSplits)); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java index d2e88c3..949ba4f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java @@ -324,7 +324,7 @@ public class PubsubUnboundedSourceTest { } @Test - public void noSubscriptionSplitIntoBundlesGeneratesSubscription() throws Exception { + public void noSubscriptionSplitGeneratesSubscription() throws Exception { TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic"); factory = PubsubTestClient.createFactoryForCreateSubscription(); PubsubUnboundedSource<String> source = @@ -343,7 +343,7 @@ public class PubsubUnboundedSourceTest { PipelineOptions options = PipelineOptionsFactory.create(); List<PubsubSource<String>> splits = - (new PubsubSource<>(source)).generateInitialSplits(3, options); + (new PubsubSource<>(source)).split(3, options); // We have at least one returned split assertThat(splits, hasSize(greaterThan(0))); for (PubsubSource<String> split : splits) { http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java index d776ea0..93ff108 100644 --- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java +++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java @@ -449,7 +449,7 @@ public class HadoopInputFormatIO { } @Override - public List<BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes, + public List<BoundedSource<KV<K, V>>> split(long desiredBundleSizeBytes, PipelineOptions options) throws Exception { // desiredBundleSizeBytes is not being considered as splitting based on this // value is not supported by inputFormat getSplits() method. @@ -485,7 +485,7 @@ public class HadoopInputFormatIO { /** * This is a helper function to compute splits. This method will also calculate size of the * data being read. Note: This method is executed exactly once and the splits are retrieved - * and cached in this. These splits are further used by splitIntoBundles() and + * and cached in this. These splits are further used by split() and * getEstimatedSizeBytes(). */ @VisibleForTesting http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java index 3a4a99d..da70632 100644 --- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.NewObjectsE import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.HadoopInputFormatBoundedSource; import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableConfiguration; import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableSplit; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; @@ -518,8 +519,8 @@ public class HadoopInputFormatIOTest { // Validate if estimated size is equal to the size of records. assertEquals(referenceRecords.size(), estimatedSize); List<BoundedSource<KV<Text, Employee>>> boundedSourceList = - hifSource.splitIntoBundles(0, p.getOptions()); - // Validate if splitIntoBundles() has split correctly. + hifSource.split(0, p.getOptions()); + // Validate if split() has split correctly. assertEquals(TestEmployeeDataSet.NUMBER_OF_SPLITS, boundedSourceList.size()); List<KV<Text, Employee>> bundleRecords = new ArrayList<>(); for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) { @@ -638,12 +639,14 @@ public class HadoopInputFormatIOTest { } /** - * This test validates behavior of {@link HadoopInputFormatBoundedSource#createReader() - * createReader()} method when {@link HadoopInputFormatBoundedSource#splitIntoBundles() - * splitIntoBundles()} is not called. + * This test validates behavior of + * {@link HadoopInputFormatBoundedSource#createReader(PipelineOptions)} + * createReader()} method when + * {@link HadoopInputFormatBoundedSource#split(long, PipelineOptions)} + * split()} is not called. */ @Test - public void testCreateReaderIfSplitIntoBundlesNotCalled() throws Exception { + public void testCreateReaderIfSplitNotCalled() throws Exception { HadoopInputFormatBoundedSource<Text, Employee> hifSource = getTestHIFSource( EmployeeInputFormat.class, Text.class, @@ -658,7 +661,7 @@ public class HadoopInputFormatIOTest { /** * This test validates behavior of * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop - * InputFormat's {@link InputFormat#getSplits() getSplits()} returns empty list. + * InputFormat's {@link InputFormat#getSplits(JobContext)} returns empty list. */ @Test public void testComputeSplitsIfGetSplitsReturnsEmptyList() throws Exception { @@ -843,6 +846,6 @@ public class HadoopInputFormatIOTest { inputFormatValueClass, keyCoder, valueCoder); - return boundedSource.splitIntoBundles(0, p.getOptions()); + return boundedSource.split(0, p.getOptions()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index ed191cb..ccdcef6 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -422,10 +422,9 @@ public class HBaseIO { return sources; } - @Override - public List<? extends BoundedSource<Result>> - splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) - throws Exception { + @Override + public List<? extends BoundedSource<Result>> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { LOG.debug("desiredBundleSize {} bytes", desiredBundleSizeBytes); long estimatedSizeBytes = getEstimatedSizeBytes(options); int numSplits = 1; http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java index ee3369e..c2410ea 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java @@ -197,7 +197,7 @@ public class HBaseIOTest { HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */); List<? extends BoundedSource<Result>> splits = - source.splitIntoBundles(numRows * bytesPerRow / numRegions, + source.split(numRows * bytesPerRow / numRegions, null /* options */); // Test num splits and split equality. http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java index b55944b..5cc2097 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -265,7 +265,7 @@ public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> { // ======================================================================= @Override - public List<? extends BoundedSource<T>> splitIntoBundles( + public List<? extends BoundedSource<T>> split( final long desiredBundleSizeBytes, PipelineOptions options) throws Exception { if (serializableSplit() == null) { @@ -296,8 +296,8 @@ public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> { long size = 0; try { - // If this source represents a split from splitIntoBundles, then return the size of the split, - // rather then the entire input + // If this source represents a split from split, + // then return the size of the split, rather then the entire input if (serializableSplit() != null) { return serializableSplit().getSplit().getLength(); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java index c821d9d..a964239 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java +++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java @@ -159,7 +159,7 @@ public class HDFSFileSourceTest { // Split with a small bundle size (has to be at least size of sync interval) List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source - .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options); + .split(SequenceFile.SYNC_INTERVAL, options); assertTrue(splits.size() > 2); SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); int nonEmptySplits = 0; @@ -184,7 +184,7 @@ public class HDFSFileSourceTest { long originalSize = source.getEstimatedSizeBytes(options); long splitTotalSize = 0; - List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.splitIntoBundles( + List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.split( SequenceFile.SYNC_INTERVAL, options ); for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) { http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 89016ac..104bea4 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -340,7 +340,7 @@ public class JmsIO { } @Override - public List<UnboundedJmsSource> generateInitialSplits( + public List<UnboundedJmsSource> split( int desiredNumSplits, PipelineOptions options) throws Exception { List<UnboundedJmsSource> sources = new ArrayList<>(); if (spec.getTopic() != null) {
