If your complaint is about offsets being committed that you didn't
expect... auto commit being false on executors shouldn't have anything
to do with that.  Executors shouldn't be auto-committing, that's why
it's being overridden.

What you've said and the code you posted isn't really enough to
explain what your issue is, e.g.

is this line
// save the rdd to Cassandra database
a blocking call

are you sure that the rdd foreach isn't being retried and succeeding
the second time around, etc

On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
<deshpandesh...@gmail.com> wrote:
> Hello All,
> I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>
> I am setting enable.auto.commit to false, and manually want to commit the
> offsets after my output operation is successful. So when a exception is
> raised during during the processing I do not want the offsets to be
> committed. But looks like the offsets are automatically committed even when
> the exception is raised and thereby I am losing data.
> In my logs I see,  WARN  overriding enable.auto.commit to false for
> executor.  But I don't want it to override. Please help.
>
> My code looks like..
>
>     val kafkaParams = Map[String, Object](
>       "bootstrap.servers" -> brokers,
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> classOf[StringDeserializer],
>       "group.id" -> "Group1",
>       "auto.offset.reset" -> offsetresetparameter,
>       "enable.auto.commit" -> (false: java.lang.Boolean)
>     )
>
>     val myTopics = Array("topic1")
>     val stream1 = KafkaUtils.createDirectStream[String, String](
>       ssc,
>       PreferConsistent,
>       Subscribe[String, String](myTopics, kafkaParams)
>     )
>
>     stream1.foreachRDD { (rdd, time) =>
>         val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>         try {
>             //save the rdd to Cassandra database
>
>           stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>         } catch {
>           case ex: Exception => {
>             println(ex.toString + "!!!!!! Bad Data, Unable to persist into
> table !!!!!" + errorOffsetRangesToString(offsetRanges))
>           }
>         }
>     }
>
>     ssc.start()
>     ssc.awaitTermination()

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to