- If you're seeing repeated attempts to process the same message, you
should be able to look in the UI or logs and see that a task has
failed.  Figure out why that task failed before chasing other things

- You're not using the latest version, the latest version is for spark
2.0.  There are two versions of the connector for spark 2.0, one for
kafka 0.8 or higher, and one for kafka 0.10 or higher

- Committing individual messages to kafka doesn't make any sense,
spark streaming deals with batches.  If you're doing any aggregations
that involve shuffling, there isn't even a guarantee that you'll
process messages in order for a given topicpartition

- Auto commit has no effect for the 0.8 version of createDirectStream.
Turning it on for the 0.10 version of createDirectStream is a really
bad idea, it will give you undefined delivery semantics, because the
commit to Kafka is unrelated to whether the batch processed
successfully

If you're unclear on how the kafka integration works, see

https://github.com/koeninger/kafka-exactly-once

On Thu, Sep 8, 2016 at 4:44 PM, Cheng Yi <phillipchen...@gmail.com> wrote:
> I am using the lastest streaming kafka connector
> <groupId>org.apache.spark</groupId>
> <artifactId>spark-streaming-kafka_2.11</artifactId>
> <version>1.6.2</version>
>
> I am facing the problem that a message is delivered two times to my
> consumers. these two deliveries are 10+ seconds apart, it looks this is
> caused by my lengthy message processing (took about 60 seconds), then I
> tried to solve this, but I am still stuck.
>
> 1. looks the kafka streaming connector supports kafka v0.8 and maybe v0.9
> but not v.10
>
> JavaPairInputDStream<String, String> ds = KafkaUtils.createDirectStream(jsc,
>                                         String.class, String.class, 
> StringDecoder.class, StringDecoder.class,
> kafkaParams, topicsSet);
>
> 2. after i got the message from the kafka streaming via consumer, how can I
> commit the message without finish the whole processing (the whole processing
> might take minutes), it looks I can't get the consumer from the KafkaUtils
> to execute the kafka commit API.
>
> 3. If I can't do the manual commit, then I need to tell Kafka Consumer to
> allow longer session or auto commit, for v0.8 or v0.9, I have tried to pass
> following properties to KafkaUtils
>
> kafkaParams.put("auto.commit.enable", "true");
> kafkaParams.put("auto.commit.interval.ms", "1000");
> kafkaParams.put("zookeeper.session.timeout.ms", "60000");
> kafkaParams.put("zookeeper.connection.timeout.ms", "60000");
>
> Still not working.
> Help is greatly appreciated !
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-connector-questions-tp27681.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to