[ 
https://issues.apache.org/jira/browse/BEAM-727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17127859#comment-17127859
 ] 

josson paul kalapparambath commented on BEAM-727:
-------------------------------------------------

[~iemejia] [~jkff]

I am planning to come up with a KafakIO to handle dynamic addition of Kafka 
partitions. 

Below is the pseudo code
{code:java}
class ReadAll {

    public PCollection<KafkaRecord<K,V>> expand(PBegin input) {
        Watch.Growth<Read, Integer, TopicPartition> growthFn = 
Watch.growthOf(Contextful.of(new KafkaNewPartitonPolFn(), Requirements.empty()),
                new 
ExtractOnlyPartiton())).withPollInterval(getPollingInterval());

        PCollection<KafkaRecord<K,V>> = input
                .apply("convert to read request", this)
                .apply(" WatchForNewPartitions", growthFn)
                .apply(Values.create())
                .apply("Kafka Partiton consumer", new KafkaPartitionConsumer())

    }

    private static class KafkaPartitionConsumer extends DoFn<TopicPartition, 
KafkaRecord<K, V> {
        //Kafka reader

    }

}{code}
 

I have few questions here.
 1) How do I make sure that the 'growthFn' emits only the new Kafka partitions. 
It happens in KafkaNewPartitonPolFn class
 2) In the KafkaPartitionConsumer class, do I have to take care of 
Checkpointing the Kafka offset (so that, in case of pipeline faliure, the Kafka 
data will be read from where it stoped)
 3) If Kafka partitions matches with parallelism of the Pipeline, how do I make 
sure that KafkaPartitionConsumer instance is distributed across parallel 
instances (Does it taken care automatically by the Runner. We are using Flink 
Runner.)

 

 

 

> KafkaIO should support dynamic addition of Kafka partitions to assigned 
> topics. 
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-727
>                 URL: https://issues.apache.org/jira/browse/BEAM-727
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Amit Sela
>            Priority: P3
>
> Kafka topics may add partitions dynamically (doesn't require Kafka to 
> restart, or halt a topic), and the KafkaIO should probably support this.
> *Note:* 
> Consistently assigning partitions should be taken into account, specifically 
> for the case of reading from multiple topics, where one (or more) of the 
> topics added partitions while the pipeline is running.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to