Repository: beam Updated Branches: refs/heads/master 4124cc687 -> 3101e69c4
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 8740204..7edda1a 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -214,7 +214,7 @@ public class JmsIOTest { PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); int desiredNumSplits = 5; JmsIO.UnboundedJmsSource initialSource = new JmsIO.UnboundedJmsSource(read); - List<JmsIO.UnboundedJmsSource> splits = initialSource.generateInitialSplits(desiredNumSplits, + List<JmsIO.UnboundedJmsSource> splits = initialSource.split(desiredNumSplits, pipelineOptions); // in the case of a queue, we have concurrent consumers by default, so the initial number // splits is equal to the desired number of splits @@ -227,7 +227,7 @@ public class JmsIOTest { PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); int desiredNumSplits = 5; JmsIO.UnboundedJmsSource initialSource = new JmsIO.UnboundedJmsSource(read); - List<JmsIO.UnboundedJmsSource> splits = initialSource.generateInitialSplits(desiredNumSplits, + List<JmsIO.UnboundedJmsSource> splits = initialSource.split(desiredNumSplits, pipelineOptions); // in the case of a topic, we can have only an unique subscriber on the topic per pipeline // else it means we can have duplicate messages (all subscribers on the topic receive every http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 69d82bc..7feb8d0 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -153,7 +153,7 @@ import org.slf4j.LoggerFactory; * <h3>Partition Assignment and Checkpointing</h3> * The Kafka partitions are evenly distributed among splits (workers). * Checkpointing is fully supported and each split can resume from previous checkpoint. See - * {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for more details on + * {@link UnboundedKafkaSource#split(int, PipelineOptions)} for more details on * splits and checkpoint support. * * <p>When the pipeline starts for the first time without any checkpoint, the source starts @@ -311,7 +311,7 @@ public class KafkaIO { /** * Returns a new {@link Read} that reads from the topic. - * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description + * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description * of how the partitions are distributed among the splits. */ public Read<K, V> withTopic(String topic) { @@ -321,7 +321,7 @@ public class KafkaIO { /** * Returns a new {@link Read} that reads from the topics. All the partitions from each * of the topics are read. - * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description + * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description * of how the partitions are distributed among the splits. */ public Read<K, V> withTopics(List<String> topics) { @@ -333,7 +333,7 @@ public class KafkaIO { /** * Returns a new {@link Read} that reads from the partitions. This allows reading only a subset * of partitions for one or more topics when (if ever) needed. - * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description + * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description * of how the partitions are distributed among the splits. */ public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) { @@ -626,7 +626,7 @@ public class KafkaIO { * {@code <topic, partition>} and then assigned to splits in round-robin order. */ @Override - public List<UnboundedKafkaSource<K, V>> generateInitialSplits( + public List<UnboundedKafkaSource<K, V>> split( int desiredNumSplits, PipelineOptions options) throws Exception { List<TopicPartition> partitions = new ArrayList<>(spec.getTopicPartitions()); @@ -698,7 +698,7 @@ public class KafkaIO { LOG.warn("Looks like generateSplits() is not called. Generate single split."); try { return new UnboundedKafkaReader<K, V>( - generateInitialSplits(1, options).get(0), checkpointMark); + split(1, options).get(0), checkpointMark); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index ecbc71d..2b11162 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -389,7 +389,7 @@ public class KafkaIOTest { UnboundedSource<KafkaRecord<Integer, Long>, ?> initial = mkKafkaReadTransform(numElements, null).makeSource(); List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits = - initial.generateInitialSplits(numSplits, p.getOptions()); + initial.split(numSplits, p.getOptions()); assertEquals("Expected exact splitting", numSplits, splits.size()); long elementsPerSplit = numElements / numSplits; @@ -446,7 +446,7 @@ public class KafkaIOTest { UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source = mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .makeSource() - .generateInitialSplits(1, PipelineOptionsFactory.create()) + .split(1, PipelineOptionsFactory.create()) .get(0); UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null, null); @@ -486,7 +486,7 @@ public class KafkaIOTest { UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source = mkKafkaReadTransform(initialNumElements, new ValueAsTimestampFn()) .makeSource() - .generateInitialSplits(1, PipelineOptionsFactory.create()) + .split(1, PipelineOptionsFactory.create()) .get(0); UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null, null); @@ -515,7 +515,7 @@ public class KafkaIOTest { .withMaxNumRecords(numElements) .withTimestampFn(new ValueAsTimestampFn()) .makeSource() - .generateInitialSplits(1, PipelineOptionsFactory.create()) + .split(1, PipelineOptionsFactory.create()) .get(0); reader = source.createReader(null, mark); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java index 45e0b51..7e67d07 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java @@ -56,7 +56,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi * {@code desiredNumSplits} partitions. Each partition is then a split. */ @Override - public List<KinesisSource> generateInitialSplits(int desiredNumSplits, + public List<KinesisSource> split(int desiredNumSplits, PipelineOptions options) throws Exception { KinesisReaderCheckpoint checkpoint = initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis)); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index e193d29..940d875 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -381,8 +381,8 @@ public class MongoDbGridFSIO { } @Override - public List<? extends BoundedSource<ObjectId>> splitIntoBundles(long desiredBundleSizeBytes, - PipelineOptions options) throws Exception { + public List<? extends BoundedSource<ObjectId>> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { Mongo mongo = spec.connectionConfiguration().setupMongo(); try { GridFS gridfs = spec.connectionConfiguration().setupGridFS(mongo); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index 09b8505..2b7fb0a 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -225,7 +225,7 @@ public class MongoDbIO { } @Override - public List<BoundedSource<Document>> splitIntoBundles(long desiredBundleSizeBytes, + public List<BoundedSource<Document>> split(long desiredBundleSizeBytes, PipelineOptions options) { MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri())); MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database()); http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index 71718e3..8e7f03b 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -267,7 +267,7 @@ public class MongoDBGridFSIOTest implements Serializable { // make sure 2 files can fit in long desiredBundleSizeBytes = (src.getEstimatedSizeBytes(options) * 2L) / 5L + 1000; - List<? extends BoundedSource<ObjectId>> splits = src.splitIntoBundles( + List<? extends BoundedSource<ObjectId>> splits = src.split( desiredBundleSizeBytes, options); int expectedNbSplits = 3; http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 820b265..1df445f 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -365,7 +365,7 @@ public class MqttIO { } @Override - public List<UnboundedMqttSource> generateInitialSplits(int desiredNumSplits, + public List<UnboundedMqttSource> split(int desiredNumSplits, PipelineOptions options) { // MQTT is based on a pub/sub pattern // so, if we create several subscribers on the same topic, they all will receive the same
