[
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=165248&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165248
]
ASF GitHub Bot logged work on BEAM-3925:
----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Nov/18 00:54
Start Date: 13/Nov/18 00:54
Worklog Time Spent: 10m
Work Description: rangadi closed pull request #6636: [BEAM-3925] [DO NOT
MERGE] KafkaIO : Value provider support for reader configuration.
URL: https://github.com/apache/beam/pull/6636
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 31ba72c54ba..86c339666f4 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -49,6 +49,7 @@
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
@@ -96,8 +97,9 @@
* <pre>{@code
* pipeline
* .apply(KafkaIO.<Long, String>read()
- * .withBootstrapServers("broker_1:9092,broker_2:9092")
- * .withTopic("my_topic") // use withTopics(List<String>) to read from
multiple topics.
+ *
.withBootstrapServers(StaticValueProvider.of("broker_1:9092,broker_2:9092"))
+ * .withTopic(StaticValueProvider.of("my_topic"))
+ * .withNumSplits(10) // Sets source parallelism. Default is runner
dependent.
* .withKeyDeserializer(LongDeserializer.class)
* .withValueDeserializer(StringDeserializer.class)
*
@@ -244,7 +246,7 @@
*/
public static <K, V> Read<K, V> read() {
return new AutoValue_KafkaIO_Read.Builder<K, V>()
- .setTopics(new ArrayList<>())
+ .setNumSplits(0)
.setTopicPartitions(new ArrayList<>())
.setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
.setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
@@ -279,7 +281,11 @@
extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
abstract Map<String, Object> getConsumerConfig();
- abstract List<String> getTopics();
+ @Nullable
+ abstract ValueProvider<String> getBootstrapServers();
+
+ @Nullable
+ abstract ValueProvider<List<String>> getTopics();
abstract List<TopicPartition> getTopicPartitions();
@@ -313,13 +319,17 @@
abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
+ abstract int getNumSplits();
+
abstract Builder<K, V> toBuilder();
@AutoValue.Builder
abstract static class Builder<K, V> {
abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
- abstract Builder<K, V> setTopics(List<String> topics);
+ abstract Builder<K, V> setBootstrapServers(ValueProvider<String>
boostrapServers);
+
+ abstract Builder<K, V> setTopics(ValueProvider<List<String>> topics);
abstract Builder<K, V> setTopicPartitions(List<TopicPartition>
topicPartitions);
@@ -348,13 +358,25 @@
abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);
+ abstract Builder<K, V> setNumSplits(int numSplits);
+
abstract Read<K, V> build();
}
/** Sets the bootstrap servers for the Kafka consumer. */
public Read<K, V> withBootstrapServers(String bootstrapServers) {
- return updateConsumerProperties(
- ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers));
+ return withBootstrapServers(StaticValueProvider.of(bootstrapServers));
+ }
+
+ /** Sets the bootstrap servers for the Kafka consumer. */
+ public Read<K, V> withBootstrapServers(ValueProvider<String>
bootstrapServers) {
+ return toBuilder().setBootstrapServers(bootstrapServers).build();
+ }
+
+ /** ValueProvider version of {@link #withTopic(String)}. */
+ public Read<K, V> withTopic(ValueProvider<String> topic) {
+ return withTopics(
+ ValueProvider.NestedValueProvider.of(topic, new
SingletonListTranslator<>()));
}
/**
@@ -364,7 +386,7 @@
* partitions are distributed among the splits.
*/
public Read<K, V> withTopic(String topic) {
- return withTopics(ImmutableList.of(topic));
+ return withTopic(StaticValueProvider.of(topic));
}
/**
@@ -374,9 +396,17 @@
* partitions are distributed among the splits.
*/
public Read<K, V> withTopics(List<String> topics) {
+ return withTopics(StaticValueProvider.of(ImmutableList.copyOf(topics)));
+ }
+
+ /**
+ * This is a {@link ValueProvider} version of {@link #withTopics(List)}.
When topic names are
+ * not available statically, number of splits should be provided using
#withNumberOfSplits().
+ */
+ public Read<K, V> withTopics(ValueProvider<List<String>> topics) {
checkState(
getTopicPartitions().isEmpty(), "Only topics or topicPartitions can
be set, not both");
- return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
+ return toBuilder().setTopics(topics).build();
}
/**
@@ -387,10 +417,23 @@
* partitions are distributed among the splits.
*/
public Read<K, V> withTopicPartitions(List<TopicPartition>
topicPartitions) {
- checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be
set, not both");
+ checkState(getTopics() == null, "Only topics or topicPartitions can be
set, not both");
return
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
}
+ /**
+ * Sets number of splits for the reader. Normally the number of splits is
based on partitions
+ * for the input topics and number splits suggested by the runner. Bun in
some cases, input
+ * topic names, number of workers, or the partitions may not be available
during job
+ * construction time (e.g. while using Dataflow Templates). {@link
UnboundedSource} API requires
+ * fixed number of splits during job construction time. This allows
statically setting number of
+ * partitions.
+ */
+ public Read<K, V> withNumSplits(int numSplits) {
+ checkArgument(numSplits >= 1);
+ return toBuilder().setNumSplits(numSplits).build();
+ }
+
/**
* Sets a Kafka {@link Deserializer} to interpret key bytes read from
Kafka.
*
@@ -631,11 +674,9 @@
@Override
public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
+ checkArgument(getBootstrapServers() != null, "withBootstrapServers() is
required");
checkArgument(
- getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) !=
null,
- "withBootstrapServers() is required");
- checkArgument(
- getTopics().size() > 0 || getTopicPartitions().size() > 0,
+ getTopics() != null || getTopicPartitions().size() > 0,
"Either withTopic(), withTopics() or withTopicPartitions() is
required");
checkArgument(getKeyDeserializer() != null, "withKeyDeserializer() is
required");
checkArgument(getValueDeserializer() != null, "withValueDeserializer()
is required");
@@ -763,15 +804,21 @@
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- List<String> topics = getTopics();
+ ValueProvider<List<String>> topics = getTopics();
List<TopicPartition> topicPartitions = getTopicPartitions();
- if (topics.size() > 0) {
- builder.add(DisplayData.item("topics",
Joiner.on(",").join(topics)).withLabel("Topic/s"));
+ if (topics != null) {
+ if (topics.isAccessible()) {
+ builder.add(
+ DisplayData.item("topics",
Joiner.on(",").join(topics.get())).withLabel("Topic/s"));
+ } else {
+ builder.add(DisplayData.item("topics", topics).withLabel("Topic/s"));
+ }
} else if (topicPartitions.size() > 0) {
builder.add(
DisplayData.item("topicPartitions",
Joiner.on(",").join(topicPartitions))
.withLabel("Topic Partition/s"));
}
+ builder.add(DisplayData.item(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
getBootstrapServers()));
Set<String> ignoredConsumerPropertiesKeys =
IGNORED_CONSUMER_PROPERTIES.keySet();
for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) {
String key = conf.getKey();
@@ -848,6 +895,13 @@ public void populateDisplayData(DisplayData.Builder
builder) {
return config;
}
+ private static class SingletonListTranslator<T> implements
SerializableFunction<T, List<T>> {
+ @Override
+ public List<T> apply(T input) {
+ return ImmutableList.of(input);
+ }
+ }
+
/** Static class, prevent instantiation. */
private KafkaIO() {}
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
index b9c289b3143..f0c9c37700c 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
@@ -17,20 +17,22 @@
*/
package org.apache.beam.sdk.io.kafka;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaIO.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
@@ -51,18 +53,35 @@
* then assigned to splits in round-robin order.
*/
@Override
- public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits,
PipelineOptions options)
- throws Exception {
+ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits,
PipelineOptions options) {
+ int numSplits = spec.getNumSplits() > 0 ? spec.getNumSplits() :
desiredNumSplits;
+ return IntStream.range(0, numSplits)
+ .mapToObj(i -> new
KafkaUnboundedSource<>(spec.withNumSplits(numSplits), i))
+ .collect(Collectors.toList());
+ }
- List<TopicPartition> partitions = new
ArrayList<>(spec.getTopicPartitions());
+ /**
+ * Creates a new source spec with assigned partitions and updated consumer
config before starting
+ * the reader at runtime. It fetches partitions from the Kafka if partitions
are not explicitly
+ * set by the user.
+ */
+ private KafkaIO.Read<K, V> updatedSpecWithAssignedPartitions() {
+
+ // Set bootstrap servers config.
+ KafkaIO.Read<K, V> updatedSpec =
+ spec.updateConsumerProperties(
+ ImmutableMap.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
spec.getBootstrapServers().get()));
// (a) fetch partitions for each topic
// (b) sort by <topic, partition>
// (c) round-robin assign the partitions to splits
+ List<TopicPartition> partitions = new
ArrayList<>(spec.getTopicPartitions());
if (partitions.isEmpty()) {
- try (Consumer<?, ?> consumer =
spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
- for (String topic : spec.getTopics()) {
+ try (Consumer<?, ?> consumer =
+ spec.getConsumerFactoryFn().apply(updatedSpec.getConsumerConfig())) {
+ for (String topic : spec.getTopics().get()) {
for (PartitionInfo p : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
@@ -70,60 +89,42 @@
}
}
- partitions.sort(
- Comparator.comparing(TopicPartition::topic)
-
.thenComparing(Comparator.comparingInt(TopicPartition::partition)));
+ int numSplits = spec.getNumSplits();
- checkArgument(desiredNumSplits > 0);
checkState(
partitions.size() > 0,
"Could not find any partitions. Please check Kafka configuration and
topic names");
+ checkState(
+ numSplits <= partitions.size(),
+ "Number of splits %s is larger than number of partitions %s. "
+ + "Empty splits are not supported yet. Please set number of
partitions explicitly "
+ + "using 'withNumSplits() option",
+ numSplits,
+ partitions.size());
- int numSplits = Math.min(desiredNumSplits, partitions.size());
- List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
-
- for (int i = 0; i < numSplits; i++) {
- assignments.add(new ArrayList<>());
- }
- for (int i = 0; i < partitions.size(); i++) {
- assignments.get(i % numSplits).add(partitions.get(i));
- }
-
- List<KafkaUnboundedSource<K, V>> result = new ArrayList<>(numSplits);
-
- for (int i = 0; i < numSplits; i++) {
- List<TopicPartition> assignedToSplit = assignments.get(i);
+ partitions.sort(
+
Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition));
- LOG.info(
- "Partitions assigned to split {} (total {}): {}",
- i,
- assignedToSplit.size(),
- Joiner.on(",").join(assignedToSplit));
+ List<TopicPartition> assignedPartitions =
+ partitions
+ .stream()
+ .filter(p -> p.partition() % numSplits == id) // round robin
assignment
+ .collect(Collectors.toList());
- result.add(
- new KafkaUnboundedSource<>(
- spec.toBuilder()
- .setTopics(Collections.emptyList())
- .setTopicPartitions(assignedToSplit)
- .build(),
- i));
- }
+ LOG.info(
+ "Partitions assigned to split {} (total {}): {}",
+ id,
+ assignedPartitions.size(),
+ Joiner.on(",").join(assignedPartitions));
- return result;
+ return
updatedSpec.toBuilder().setTopics(null).setTopicPartitions(assignedPartitions).build();
}
@Override
public KafkaUnboundedReader<K, V> createReader(
PipelineOptions options, KafkaCheckpointMark checkpointMark) {
- if (spec.getTopicPartitions().isEmpty()) {
- LOG.warn("Looks like generateSplits() is not called. Generate single
split.");
- try {
- return new KafkaUnboundedReader<>(split(1, options).get(0),
checkpointMark);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return new KafkaUnboundedReader<>(this, checkpointMark);
+ return new KafkaUnboundedReader<>(
+ new KafkaUnboundedSource<>(updatedSpecWithAssignedPartitions(), id),
checkpointMark);
}
@Override
@@ -147,8 +148,8 @@ public boolean requiresDeduping() {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaUnboundedSource.class);
- private final Read<K, V> spec; // Contains all the relevant configuratiton
of the source.
- private final int id; // split id, mainly for debugging
+ private final Read<K, V> spec; // Contains all the relevant configuration of
the source.
+ private final int id; // split id
public KafkaUnboundedSource(Read<K, V> spec, int id) {
this.spec = spec;
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 7d0e766e67e..70402ec4ae3 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -65,7 +65,6 @@
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer;
@@ -82,7 +81,6 @@
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
@@ -96,7 +94,6 @@
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -760,20 +757,16 @@ public void testUnboundedSourceSplits() throws Exception {
initial.split(numSplits, p.getOptions());
assertEquals("Expected exact splitting", numSplits, splits.size());
- long elementsPerSplit = numElements / numSplits;
- assertEquals("Expected even splits", numElements, elementsPerSplit *
numSplits);
- PCollectionList<Long> pcollections = PCollectionList.empty(p);
- for (int i = 0; i < splits.size(); ++i) {
- pcollections =
- pcollections.and(
- p.apply("split" + i,
Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit))
- .apply("Remove Metadata " + i, ParDo.of(new
RemoveKafkaMetadata<>()))
- .apply("collection " + i, Values.create()));
- }
- PCollection<Long> input = pcollections.apply(Flatten.pCollections());
-
- addCountingAsserts(input, numElements);
- p.run();
+ UnboundedSource<KafkaRecord<Integer, Long>, ?> withExplicitSplits =
+ mkKafkaReadTransform(numElements, null)
+ .withKeyDeserializerAndCoder(IntegerDeserializer.class,
BigEndianIntegerCoder.of())
+ .withValueDeserializerAndCoder(LongDeserializer.class,
BigEndianLongCoder.of())
+ .withNumSplits(numSplits)
+ .makeSource();
+ assertEquals(
+ "Splits should match splits set explicitly",
+ numSplits,
+ withExplicitSplits.split(1, p.getOptions()).size());
}
/** A timestamp function that uses the given value as the timestamp. */
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 165248)
Time Spent: 7h (was: 6h 50m)
> Allow ValueProvider for KafkaIO
> -------------------------------
>
> Key: BEAM-3925
> URL: https://issues.apache.org/jira/browse/BEAM-3925
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Sameer Abhyankar
> Assignee: Pramod Upamanyu
> Priority: Major
> Time Spent: 7h
> Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would
> allow us to use KafkaIO in reusable pipeline templates.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)