[ 
https://issues.apache.org/jira/browse/BEAM-3925?focusedWorklogId=165248&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165248
 ]

ASF GitHub Bot logged work on BEAM-3925:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Nov/18 00:54
            Start Date: 13/Nov/18 00:54
    Worklog Time Spent: 10m 
      Work Description: rangadi closed pull request #6636: [BEAM-3925] [DO NOT 
MERGE] KafkaIO : Value provider support for reader configuration. 
URL: https://github.com/apache/beam/pull/6636
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 31ba72c54ba..86c339666f4 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -49,6 +49,7 @@
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -96,8 +97,9 @@
  * <pre>{@code
  * pipeline
  *   .apply(KafkaIO.<Long, String>read()
- *      .withBootstrapServers("broker_1:9092,broker_2:9092")
- *      .withTopic("my_topic")  // use withTopics(List<String>) to read from 
multiple topics.
+ *      
.withBootstrapServers(StaticValueProvider.of("broker_1:9092,broker_2:9092"))
+ *      .withTopic(StaticValueProvider.of("my_topic"))
+ *      .withNumSplits(10) // Sets source parallelism. Default is runner 
dependent.
  *      .withKeyDeserializer(LongDeserializer.class)
  *      .withValueDeserializer(StringDeserializer.class)
  *
@@ -244,7 +246,7 @@
    */
   public static <K, V> Read<K, V> read() {
     return new AutoValue_KafkaIO_Read.Builder<K, V>()
-        .setTopics(new ArrayList<>())
+        .setNumSplits(0)
         .setTopicPartitions(new ArrayList<>())
         .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
         .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
@@ -279,7 +281,11 @@
       extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
     abstract Map<String, Object> getConsumerConfig();
 
-    abstract List<String> getTopics();
+    @Nullable
+    abstract ValueProvider<String> getBootstrapServers();
+
+    @Nullable
+    abstract ValueProvider<List<String>> getTopics();
 
     abstract List<TopicPartition> getTopicPartitions();
 
@@ -313,13 +319,17 @@
 
     abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
 
+    abstract int getNumSplits();
+
     abstract Builder<K, V> toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder<K, V> {
       abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
 
-      abstract Builder<K, V> setTopics(List<String> topics);
+      abstract Builder<K, V> setBootstrapServers(ValueProvider<String> 
boostrapServers);
+
+      abstract Builder<K, V> setTopics(ValueProvider<List<String>> topics);
 
       abstract Builder<K, V> setTopicPartitions(List<TopicPartition> 
topicPartitions);
 
@@ -348,13 +358,25 @@
       abstract Builder<K, V> setTimestampPolicyFactory(
           TimestampPolicyFactory<K, V> timestampPolicyFactory);
 
+      abstract Builder<K, V> setNumSplits(int numSplits);
+
       abstract Read<K, V> build();
     }
 
     /** Sets the bootstrap servers for the Kafka consumer. */
     public Read<K, V> withBootstrapServers(String bootstrapServers) {
-      return updateConsumerProperties(
-          ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers));
+      return withBootstrapServers(StaticValueProvider.of(bootstrapServers));
+    }
+
+    /** Sets the bootstrap servers for the Kafka consumer. */
+    public Read<K, V> withBootstrapServers(ValueProvider<String> 
bootstrapServers) {
+      return toBuilder().setBootstrapServers(bootstrapServers).build();
+    }
+
+    /** ValueProvider version of {@link #withTopic(String)}. */
+    public Read<K, V> withTopic(ValueProvider<String> topic) {
+      return withTopics(
+          ValueProvider.NestedValueProvider.of(topic, new 
SingletonListTranslator<>()));
     }
 
     /**
@@ -364,7 +386,7 @@
      * partitions are distributed among the splits.
      */
     public Read<K, V> withTopic(String topic) {
-      return withTopics(ImmutableList.of(topic));
+      return withTopic(StaticValueProvider.of(topic));
     }
 
     /**
@@ -374,9 +396,17 @@
      * partitions are distributed among the splits.
      */
     public Read<K, V> withTopics(List<String> topics) {
+      return withTopics(StaticValueProvider.of(ImmutableList.copyOf(topics)));
+    }
+
+    /**
+     * This is a {@link ValueProvider} version of {@link #withTopics(List)}. 
When topic names are
+     * not available statically, number of splits should be provided using 
#withNumberOfSplits().
+     */
+    public Read<K, V> withTopics(ValueProvider<List<String>> topics) {
       checkState(
           getTopicPartitions().isEmpty(), "Only topics or topicPartitions can 
be set, not both");
-      return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
+      return toBuilder().setTopics(topics).build();
     }
 
     /**
@@ -387,10 +417,23 @@
      * partitions are distributed among the splits.
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> 
topicPartitions) {
-      checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be 
set, not both");
+      checkState(getTopics() == null, "Only topics or topicPartitions can be 
set, not both");
       return 
toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
     }
 
+    /**
+     * Sets number of splits for the reader. Normally the number of splits is 
based on partitions
+     * for the input topics and number splits suggested by the runner. Bun in 
some cases, input
+     * topic names, number of workers, or the partitions may not be available 
during job
+     * construction time (e.g. while using Dataflow Templates). {@link 
UnboundedSource} API requires
+     * fixed number of splits during job construction time. This allows 
statically setting number of
+     * partitions.
+     */
+    public Read<K, V> withNumSplits(int numSplits) {
+      checkArgument(numSplits >= 1);
+      return toBuilder().setNumSplits(numSplits).build();
+    }
+
     /**
      * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
      *
@@ -631,11 +674,9 @@
 
     @Override
     public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
+      checkArgument(getBootstrapServers() != null, "withBootstrapServers() is 
required");
       checkArgument(
-          getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) != 
null,
-          "withBootstrapServers() is required");
-      checkArgument(
-          getTopics().size() > 0 || getTopicPartitions().size() > 0,
+          getTopics() != null || getTopicPartitions().size() > 0,
           "Either withTopic(), withTopics() or withTopicPartitions() is 
required");
       checkArgument(getKeyDeserializer() != null, "withKeyDeserializer() is 
required");
       checkArgument(getValueDeserializer() != null, "withValueDeserializer() 
is required");
@@ -763,15 +804,21 @@
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      List<String> topics = getTopics();
+      ValueProvider<List<String>> topics = getTopics();
       List<TopicPartition> topicPartitions = getTopicPartitions();
-      if (topics.size() > 0) {
-        builder.add(DisplayData.item("topics", 
Joiner.on(",").join(topics)).withLabel("Topic/s"));
+      if (topics != null) {
+        if (topics.isAccessible()) {
+          builder.add(
+              DisplayData.item("topics", 
Joiner.on(",").join(topics.get())).withLabel("Topic/s"));
+        } else {
+          builder.add(DisplayData.item("topics", topics).withLabel("Topic/s"));
+        }
       } else if (topicPartitions.size() > 0) {
         builder.add(
             DisplayData.item("topicPartitions", 
Joiner.on(",").join(topicPartitions))
                 .withLabel("Topic Partition/s"));
       }
+      builder.add(DisplayData.item(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
getBootstrapServers()));
       Set<String> ignoredConsumerPropertiesKeys = 
IGNORED_CONSUMER_PROPERTIES.keySet();
       for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) {
         String key = conf.getKey();
@@ -848,6 +895,13 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     return config;
   }
 
+  private static class SingletonListTranslator<T> implements 
SerializableFunction<T, List<T>> {
+    @Override
+    public List<T> apply(T input) {
+      return ImmutableList.of(input);
+    }
+  }
+
   /** Static class, prevent instantiation. */
   private KafkaIO() {}
 
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
index b9c289b3143..f0c9c37700c 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
@@ -17,20 +17,22 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.kafka.KafkaIO.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
@@ -51,18 +53,35 @@
    * then assigned to splits in round-robin order.
    */
   @Override
-  public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, 
PipelineOptions options)
-      throws Exception {
+  public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, 
PipelineOptions options) {
+    int numSplits = spec.getNumSplits() > 0 ? spec.getNumSplits() : 
desiredNumSplits;
+    return IntStream.range(0, numSplits)
+        .mapToObj(i -> new 
KafkaUnboundedSource<>(spec.withNumSplits(numSplits), i))
+        .collect(Collectors.toList());
+  }
 
-    List<TopicPartition> partitions = new 
ArrayList<>(spec.getTopicPartitions());
+  /**
+   * Creates a new source spec with assigned partitions and updated consumer 
config before starting
+   * the reader at runtime. It fetches partitions from the Kafka if partitions 
are not explicitly
+   * set by the user.
+   */
+  private KafkaIO.Read<K, V> updatedSpecWithAssignedPartitions() {
+
+    // Set bootstrap servers config.
+    KafkaIO.Read<K, V> updatedSpec =
+        spec.updateConsumerProperties(
+            ImmutableMap.of(
+                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.getBootstrapServers().get()));
 
     // (a) fetch partitions for each topic
     // (b) sort by <topic, partition>
     // (c) round-robin assign the partitions to splits
+    List<TopicPartition> partitions = new 
ArrayList<>(spec.getTopicPartitions());
 
     if (partitions.isEmpty()) {
-      try (Consumer<?, ?> consumer = 
spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
-        for (String topic : spec.getTopics()) {
+      try (Consumer<?, ?> consumer =
+          spec.getConsumerFactoryFn().apply(updatedSpec.getConsumerConfig())) {
+        for (String topic : spec.getTopics().get()) {
           for (PartitionInfo p : consumer.partitionsFor(topic)) {
             partitions.add(new TopicPartition(p.topic(), p.partition()));
           }
@@ -70,60 +89,42 @@
       }
     }
 
-    partitions.sort(
-        Comparator.comparing(TopicPartition::topic)
-            
.thenComparing(Comparator.comparingInt(TopicPartition::partition)));
+    int numSplits = spec.getNumSplits();
 
-    checkArgument(desiredNumSplits > 0);
     checkState(
         partitions.size() > 0,
         "Could not find any partitions. Please check Kafka configuration and 
topic names");
+    checkState(
+        numSplits <= partitions.size(),
+        "Number of splits %s is larger than number of partitions %s.  "
+            + "Empty splits are not supported yet. Please set number of 
partitions explicitly "
+            + "using 'withNumSplits() option",
+        numSplits,
+        partitions.size());
 
-    int numSplits = Math.min(desiredNumSplits, partitions.size());
-    List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
-
-    for (int i = 0; i < numSplits; i++) {
-      assignments.add(new ArrayList<>());
-    }
-    for (int i = 0; i < partitions.size(); i++) {
-      assignments.get(i % numSplits).add(partitions.get(i));
-    }
-
-    List<KafkaUnboundedSource<K, V>> result = new ArrayList<>(numSplits);
-
-    for (int i = 0; i < numSplits; i++) {
-      List<TopicPartition> assignedToSplit = assignments.get(i);
+    partitions.sort(
+        
Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition));
 
-      LOG.info(
-          "Partitions assigned to split {} (total {}): {}",
-          i,
-          assignedToSplit.size(),
-          Joiner.on(",").join(assignedToSplit));
+    List<TopicPartition> assignedPartitions =
+        partitions
+            .stream()
+            .filter(p -> p.partition() % numSplits == id) // round robin 
assignment
+            .collect(Collectors.toList());
 
-      result.add(
-          new KafkaUnboundedSource<>(
-              spec.toBuilder()
-                  .setTopics(Collections.emptyList())
-                  .setTopicPartitions(assignedToSplit)
-                  .build(),
-              i));
-    }
+    LOG.info(
+        "Partitions assigned to split {} (total {}): {}",
+        id,
+        assignedPartitions.size(),
+        Joiner.on(",").join(assignedPartitions));
 
-    return result;
+    return 
updatedSpec.toBuilder().setTopics(null).setTopicPartitions(assignedPartitions).build();
   }
 
   @Override
   public KafkaUnboundedReader<K, V> createReader(
       PipelineOptions options, KafkaCheckpointMark checkpointMark) {
-    if (spec.getTopicPartitions().isEmpty()) {
-      LOG.warn("Looks like generateSplits() is not called. Generate single 
split.");
-      try {
-        return new KafkaUnboundedReader<>(split(1, options).get(0), 
checkpointMark);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-    return new KafkaUnboundedReader<>(this, checkpointMark);
+    return new KafkaUnboundedReader<>(
+        new KafkaUnboundedSource<>(updatedSpecWithAssignedPartitions(), id), 
checkpointMark);
   }
 
   @Override
@@ -147,8 +148,8 @@ public boolean requiresDeduping() {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaUnboundedSource.class);
 
-  private final Read<K, V> spec; // Contains all the relevant configuratiton 
of the source.
-  private final int id; // split id, mainly for debugging
+  private final Read<K, V> spec; // Contains all the relevant configuration of 
the source.
+  private final int id; // split id
 
   public KafkaUnboundedSource(Read<K, V> spec, int id) {
     this.spec = spec;
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 7d0e766e67e..70402ec4ae3 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -65,7 +65,6 @@
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer;
@@ -82,7 +81,6 @@
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Distinct;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
@@ -96,7 +94,6 @@
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -760,20 +757,16 @@ public void testUnboundedSourceSplits() throws Exception {
         initial.split(numSplits, p.getOptions());
     assertEquals("Expected exact splitting", numSplits, splits.size());
 
-    long elementsPerSplit = numElements / numSplits;
-    assertEquals("Expected even splits", numElements, elementsPerSplit * 
numSplits);
-    PCollectionList<Long> pcollections = PCollectionList.empty(p);
-    for (int i = 0; i < splits.size(); ++i) {
-      pcollections =
-          pcollections.and(
-              p.apply("split" + i, 
Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit))
-                  .apply("Remove Metadata " + i, ParDo.of(new 
RemoveKafkaMetadata<>()))
-                  .apply("collection " + i, Values.create()));
-    }
-    PCollection<Long> input = pcollections.apply(Flatten.pCollections());
-
-    addCountingAsserts(input, numElements);
-    p.run();
+    UnboundedSource<KafkaRecord<Integer, Long>, ?> withExplicitSplits =
+        mkKafkaReadTransform(numElements, null)
+            .withKeyDeserializerAndCoder(IntegerDeserializer.class, 
BigEndianIntegerCoder.of())
+            .withValueDeserializerAndCoder(LongDeserializer.class, 
BigEndianLongCoder.of())
+            .withNumSplits(numSplits)
+            .makeSource();
+    assertEquals(
+        "Splits should match splits set explicitly",
+        numSplits,
+        withExplicitSplits.split(1, p.getOptions()).size());
   }
 
   /** A timestamp function that uses the given value as the timestamp. */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 165248)
    Time Spent: 7h  (was: 6h 50m)

> Allow ValueProvider for KafkaIO
> -------------------------------
>
>                 Key: BEAM-3925
>                 URL: https://issues.apache.org/jira/browse/BEAM-3925
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Sameer Abhyankar
>            Assignee: Pramod Upamanyu
>            Priority: Major
>          Time Spent: 7h
>  Remaining Estimate: 0h
>
> Add ValueProvider support for the various methods in KafkaIO. This would 
> allow us to use KafkaIO in reusable pipeline templates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to