Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-03 Thread Cody Koeninger
;>>>>>>>>>>>
>>>>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for
>>>>>>>>>>>> that
>>>>>>>>>>>> partition And how would it handle the offsets already
>>>>>>>>>>>> calculated in the
>>>>>>>>>>>> backlog (if there is one)?
>>>>>>>>>>>>
>>>>>>>>>>>> 3.Track the offsets separately, restart the job by providing
>>>>>>>>>>>> the offsets.
>>>>>>>>>>>>
>>>>>>>>>>>> 4.Or a straightforward approach would be to monitor the log for
>>>>>>>>>>>> this error,
>>>>>>>>>>>> and if it occurs more than X times, kill the job, remove the
>>>>>>>>>>>> checkpoint
>>>>>>>>>>>> directory, and restart.
>>>>>>>>>>>>
>>>>>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets
>>>>>>>>>>>> for
>>>>>>>>>>>> Set([test_stream,5]))
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>>>>>> [Terminated,
>>>>>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed
>>>>>>>>>>>> tasks =
>>>>>>>>>>>> 12112]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>>> failure: Task 10
>>>>>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task
>>>>>>>>>>>> 10.3 in stage
>>>>>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>>>>>
>>>>>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>>>>>>> java.lang.InterruptedException
>>>>>>>>>>>>
>>>>>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>>>> failure: Task 7 in
>>>>>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3
>>>>>>>>>>>> in stage 33.0
>>>>>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>>
>>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>>
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -
>>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Cody Koeninger
; kafka.common.NotLeaderForPartitionException
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>
>>>>>>
>>>>>>
>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>
>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>> [Terminated,
>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>>>>> 12112]
>>>>>>
>>>>>>
>>>>>>
>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>> Task 10
>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>>>>> stage
>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>
>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>> java.lang.InterruptedException
>>>>>>
>>>>>> Caused by: java.lang.InterruptedException
>>>>>>
>>>>>> java.lang.ClassNotFoundException:
>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>
>>>>>>
>>>>>>
>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>> Task 7 in
>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>> stage 33.0
>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>
>>>>>> java.lang.ClassNotFoundException:
>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>
>>>>>> java.lang.ClassNotFoundException:
>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
or the log for this
>>>>>>> error,
>>>>>>> and if it occurs more than X times, kill the job, remove the
>>>>>>> checkpoint
>>>>>>> directory, and restart.
>>>>>>>
>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>> Set([test_stream,5]))
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> java.lang.ClassNotFoundException:
>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>
>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>> [Terminated,
>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks
>>>>>>> =
>>>>>>> 12112]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>> Task 10
>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>>>>>> stage
>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>
>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>> java.lang.InterruptedException
>>>>>>>
>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>
>>>>>>> java.lang.ClassNotFoundException:
>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>> Task 7 in
>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>>> stage 33.0
>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>
>>>>>>> java.lang.ClassNotFoundException:
>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>
>>>>>>> java.lang.ClassNotFoundException:
>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Cody Koeninger
 somehow force things to "reset" for that
>>>>>>>> partition And how would it handle the offsets already calculated in
>>>>>>>> the
>>>>>>>> backlog (if there is one)?
>>>>>>>>
>>>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> If you're consistently getting offset out of range exceptions,
>>>>>>>>> it's probably because messages are getting deleted before you've 
>>>>>>>>> processed
>>>>>>>>> them.
>>>>>>>>>
>>>>>>>>> The only real way to deal with this is give kafka more retention,
>>>>>>>>> consume faster, or both.
>>>>>>>>>
>>>>>>>>> If you're just looking for a quick "fix" for an infrequent issue,
>>>>>>>>> option 4 is probably easiest.  I wouldn't do that automatically / 
>>>>>>>>> silently,
>>>>>>>>> because you're losing data.
>>>>>>>>>
>>>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> So, our Streaming Job fails with the following errors. If you see
>>>>>>>>>> the errors
>>>>>>>>>> below, they are all related to Kafka losing offsets and
>>>>>>>>>> OffsetOutOfRangeException.
>>>>>>>>>>
>>>>>>>>>> What are the options we have other than fixing Kafka? We would
>>>>>>>>>> like to do
>>>>>>>>>> something like the following. How can we achieve 1 and 2 with
>>>>>>>>>> Spark Kafka
>>>>>>>>>> Direct?
>>>>>>>>>>
>>>>>>>>>> 1.Need to see a way to skip some offsets if they are not
>>>>>>>>>> available after the
>>>>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>>>>
>>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for
>>>>>>>>>> that
>>>>>>>>>> partition And how would it handle the offsets already calculated
>>>>>>>>>> in the
>>>>>>>>>> backlog (if there is one)?
>>>>>>>>>>
>>>>>>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>>>>>>> offsets.
>>>>>>>>>>
>>>>>>>>>> 4.Or a straightforward approach would be to monitor the log for
>>>>>>>>>> this error,
>>>>>>>>>> and if it occurs more than X times, kill the job, remove the
>>>>>>>>>> checkpoint
>>>>>>>>>> directory, and restart.
>>>>>>>>>>
>>>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>>>>> Set([test_stream,5]))
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>>>
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>>>> [Terminated,
>>>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed
>>>>>>>>>> tasks =
>>>>>>>>>> 12112]
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>> failure: Task 10
>>>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3
>>>>>>>>>> in stage
>>>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>>>
>>>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>>>>> java.lang.InterruptedException
>>>>>>>>>>
>>>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>>>
>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>>> failure: Task 7 in
>>>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>>>>>> stage 33.0
>>>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>>>
>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>
>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> View this message in context:
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>> Nabble.com.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -
>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Cody Koeninger
g like the following. How can we achieve 1 and 2 with Spark
>>>>>>>> Kafka
>>>>>>>> Direct?
>>>>>>>>
>>>>>>>> 1.Need to see a way to skip some offsets if they are not available
>>>>>>>> after the
>>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>>
>>>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>>>> partition And how would it handle the offsets already calculated in
>>>>>>>> the
>>>>>>>> backlog (if there is one)?
>>>>>>>>
>>>>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>>>>> offsets.
>>>>>>>>
>>>>>>>> 4.Or a straightforward approach would be to monitor the log for
>>>>>>>> this error,
>>>>>>>> and if it occurs more than X times, kill the job, remove the
>>>>>>>> checkpoint
>>>>>>>> directory, and restart.
>>>>>>>>
>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>>> Set([test_stream,5]))
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>>
>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>> [Terminated,
>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed
>>>>>>>> tasks =
>>>>>>>> 12112]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>> Task 10
>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3
>>>>>>>> in stage
>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>
>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>>> java.lang.InterruptedException
>>>>>>>>
>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>> Task 7 in
>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>>>> stage 33.0
>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> View this message in context:
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>> Nabble.com.
>>>>>>>>
>>>>>>>>
>>>>>>>> -
>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-02 Thread Dibyendu Bhattacharya
>>>>> option 4 is probably easiest.  I wouldn't do that automatically / 
>>>>>>>> silently,
>>>>>>>> because you're losing data.
>>>>>>>>
>>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> So, our Streaming Job fails with the following errors. If you see
>>>>>>>>> the errors
>>>>>>>>> below, they are all related to Kafka losing offsets and
>>>>>>>>> OffsetOutOfRangeException.
>>>>>>>>>
>>>>>>>>> What are the options we have other than fixing Kafka? We would
>>>>>>>>> like to do
>>>>>>>>> something like the following. How can we achieve 1 and 2 with
>>>>>>>>> Spark Kafka
>>>>>>>>> Direct?
>>>>>>>>>
>>>>>>>>> 1.Need to see a way to skip some offsets if they are not available
>>>>>>>>> after the
>>>>>>>>> max retries are reached..in that case there might be data loss.
>>>>>>>>>
>>>>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>>>>> partition And how would it handle the offsets already calculated
>>>>>>>>> in the
>>>>>>>>> backlog (if there is one)?
>>>>>>>>>
>>>>>>>>> 3.Track the offsets separately, restart the job by providing the
>>>>>>>>> offsets.
>>>>>>>>>
>>>>>>>>> 4.Or a straightforward approach would be to monitor the log for
>>>>>>>>> this error,
>>>>>>>>> and if it occurs more than X times, kill the job, remove the
>>>>>>>>> checkpoint
>>>>>>>>> directory, and restart.
>>>>>>>>>
>>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>>>>>>> Set([test_stream,5]))
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>>>
>>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>>> [Terminated,
>>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed
>>>>>>>>> tasks =
>>>>>>>>> 12112]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>>> Task 10
>>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3
>>>>>>>>> in stage
>>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>>
>>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>>>> java.lang.InterruptedException
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>>>> Task 7 in
>>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in
>>>>>>>>> stage 33.0
>>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> View this message in context:
>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>> Nabble.com.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -
>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
Hi Cody,

How to look at Option 2(see the following)? Which portion of the code in
Spark Kafka Direct to look at to handle this issue specific to our
requirements.


2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org> wrote:

> If you're consistently getting offset out of range exceptions, it's
> probably because messages are getting deleted before you've processed them.
>
> The only real way to deal with this is give kafka more retention, consume
> faster, or both.
>
> If you're just looking for a quick "fix" for an infrequent issue, option 4
> is probably easiest.  I wouldn't do that automatically / silently, because
> you're losing data.
>
> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> So, our Streaming Job fails with the following errors. If you see the
>> errors
>> below, they are all related to Kafka losing offsets and
>> OffsetOutOfRangeException.
>>
>> What are the options we have other than fixing Kafka? We would like to do
>> something like the following. How can we achieve 1 and 2 with Spark Kafka
>> Direct?
>>
>> 1.Need to see a way to skip some offsets if they are not available after
>> the
>> max retries are reached..in that case there might be data loss.
>>
>> 2.Catch that exception and somehow force things to "reset" for that
>> partition And how would it handle the offsets already calculated in the
>> backlog (if there is one)?
>>
>> 3.Track the offsets separately, restart the job by providing the offsets.
>>
>> 4.Or a straightforward approach would be to monitor the log for this
>> error,
>> and if it occurs more than X times, kill the job, remove the checkpoint
>> directory, and restart.
>>
>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([test_stream,5]))
>>
>>
>>
>> java.lang.ClassNotFoundException:
>> kafka.common.NotLeaderForPartitionException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> java.util.concurrent.RejectedExecutionException: Task
>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>> [Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>> 12112]
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 10
>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in stage
>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>
>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>> java.lang.InterruptedException
>>
>> Caused by: java.lang.InterruptedException
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 7
>> in
>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>> 33.0
>> (TID 283, 172.16.97.103): UnknownReason
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
How to avoid those Errors with receiver based approach? Suppose we are OK
with at least once processing and use receiver based approach which uses
ZooKeeper but not query Kafka directly, would these errors(Couldn't find
leader offsets for
Set([test_stream,5])))be avoided?

On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <c...@koeninger.org> wrote:

> KafkaRDD.scala , handleFetchErr
>
> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> How to look at Option 2(see the following)? Which portion of the code in
>> Spark Kafka Direct to look at to handle this issue specific to our
>> requirements.
>>
>>
>> 2.Catch that exception and somehow force things to "reset" for that
>> partition And how would it handle the offsets already calculated in the
>> backlog (if there is one)?
>>
>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> If you're consistently getting offset out of range exceptions, it's
>>> probably because messages are getting deleted before you've processed them.
>>>
>>> The only real way to deal with this is give kafka more retention,
>>> consume faster, or both.
>>>
>>> If you're just looking for a quick "fix" for an infrequent issue, option
>>> 4 is probably easiest.  I wouldn't do that automatically / silently,
>>> because you're losing data.
>>>
>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> So, our Streaming Job fails with the following errors. If you see the
>>>> errors
>>>> below, they are all related to Kafka losing offsets and
>>>> OffsetOutOfRangeException.
>>>>
>>>> What are the options we have other than fixing Kafka? We would like to
>>>> do
>>>> something like the following. How can we achieve 1 and 2 with Spark
>>>> Kafka
>>>> Direct?
>>>>
>>>> 1.Need to see a way to skip some offsets if they are not available
>>>> after the
>>>> max retries are reached..in that case there might be data loss.
>>>>
>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>> partition And how would it handle the offsets already calculated in the
>>>> backlog (if there is one)?
>>>>
>>>> 3.Track the offsets separately, restart the job by providing the
>>>> offsets.
>>>>
>>>> 4.Or a straightforward approach would be to monitor the log for this
>>>> error,
>>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>>> directory, and restart.
>>>>
>>>> ERROR DirectKafkaInputDStream:
>>>> ArrayBuffer(kafka.common.UnknownException,
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([test_stream,5]))
>>>>
>>>>
>>>>
>>>> java.lang.ClassNotFoundException:
>>>> kafka.common.NotLeaderForPartitionException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>>
>>>>
>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>
>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>> [Terminated,
>>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>>> 12112]
>>>>
>>>>
>>>>
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 10
>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>>> stage
>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>
>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>> java.lang.InterruptedException
>>>>
>>>> Caused by: java.lang.InterruptedException
>>>>
>>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>>
>>>>
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 7 in
>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>>> 33.0
>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>
>>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>>
>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread Cody Koeninger
KafkaRDD.scala , handleFetchErr

On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <swethakasire...@gmail.com>
wrote:

> Hi Cody,
>
> How to look at Option 2(see the following)? Which portion of the code in
> Spark Kafka Direct to look at to handle this issue specific to our
> requirements.
>
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> If you're consistently getting offset out of range exceptions, it's
>> probably because messages are getting deleted before you've processed them.
>>
>> The only real way to deal with this is give kafka more retention, consume
>> faster, or both.
>>
>> If you're just looking for a quick "fix" for an infrequent issue, option
>> 4 is probably easiest.  I wouldn't do that automatically / silently,
>> because you're losing data.
>>
>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> So, our Streaming Job fails with the following errors. If you see the
>>> errors
>>> below, they are all related to Kafka losing offsets and
>>> OffsetOutOfRangeException.
>>>
>>> What are the options we have other than fixing Kafka? We would like to do
>>> something like the following. How can we achieve 1 and 2 with Spark Kafka
>>> Direct?
>>>
>>> 1.Need to see a way to skip some offsets if they are not available after
>>> the
>>> max retries are reached..in that case there might be data loss.
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> 3.Track the offsets separately, restart the job by providing the offsets.
>>>
>>> 4.Or a straightforward approach would be to monitor the log for this
>>> error,
>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>> directory, and restart.
>>>
>>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([test_stream,5]))
>>>
>>>
>>>
>>> java.lang.ClassNotFoundException:
>>> kafka.common.NotLeaderForPartitionException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> java.util.concurrent.RejectedExecutionException: Task
>>>
>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>> [Terminated,
>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>> 12112]
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 10
>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>> stage
>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>
>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>> java.lang.InterruptedException
>>>
>>> Caused by: java.lang.InterruptedException
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 7 in
>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>> 33.0
>>> (TID 283, 172.16.97.103): UnknownReason
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread swetha kasireddy
Following is the Option 2 that I was talking about:

2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

On Tue, Dec 1, 2015 at 1:39 PM, swetha kasireddy <swethakasire...@gmail.com>
wrote:

> Hi Cody,
>
> How to look at Option 2(see the following)? Which portion of the code in
> Spark Kafka Direct to look at to handle this issue specific to our
> requirements.
>
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> If you're consistently getting offset out of range exceptions, it's
>> probably because messages are getting deleted before you've processed them.
>>
>> The only real way to deal with this is give kafka more retention, consume
>> faster, or both.
>>
>> If you're just looking for a quick "fix" for an infrequent issue, option
>> 4 is probably easiest.  I wouldn't do that automatically / silently,
>> because you're losing data.
>>
>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> So, our Streaming Job fails with the following errors. If you see the
>>> errors
>>> below, they are all related to Kafka losing offsets and
>>> OffsetOutOfRangeException.
>>>
>>> What are the options we have other than fixing Kafka? We would like to do
>>> something like the following. How can we achieve 1 and 2 with Spark Kafka
>>> Direct?
>>>
>>> 1.Need to see a way to skip some offsets if they are not available after
>>> the
>>> max retries are reached..in that case there might be data loss.
>>>
>>> 2.Catch that exception and somehow force things to "reset" for that
>>> partition And how would it handle the offsets already calculated in the
>>> backlog (if there is one)?
>>>
>>> 3.Track the offsets separately, restart the job by providing the offsets.
>>>
>>> 4.Or a straightforward approach would be to monitor the log for this
>>> error,
>>> and if it occurs more than X times, kill the job, remove the checkpoint
>>> directory, and restart.
>>>
>>> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([test_stream,5]))
>>>
>>>
>>>
>>> java.lang.ClassNotFoundException:
>>> kafka.common.NotLeaderForPartitionException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> java.util.concurrent.RejectedExecutionException: Task
>>>
>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>> [Terminated,
>>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
>>> 12112]
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 10
>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>> stage
>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>
>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>> java.lang.InterruptedException
>>>
>>> Caused by: java.lang.InterruptedException
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 7 in
>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>> 33.0
>>> (TID 283, 172.16.97.103): UnknownReason
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>>>
>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread Dibyendu Bhattacharya
>>>>> Task 10
>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in
>>>>> stage
>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>
>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>> java.lang.InterruptedException
>>>>>
>>>>> Caused by: java.lang.InterruptedException
>>>>>
>>>>> java.lang.ClassNotFoundException:
>>>>> kafka.common.OffsetOutOfRangeException
>>>>>
>>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>
>>>>>
>>>>>
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task 7 in
>>>>> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
>>>>> 33.0
>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>
>>>>> java.lang.ClassNotFoundException:
>>>>> kafka.common.OffsetOutOfRangeException
>>>>>
>>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>
>>>>> java.lang.ClassNotFoundException:
>>>>> kafka.common.OffsetOutOfRangeException
>>>>>
>>>>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-12-01 Thread Cody Koeninger
If you're consistently getting offset out of range exceptions, it's
probably because messages are getting deleted before you've processed them.

The only real way to deal with this is give kafka more retention, consume
faster, or both.

If you're just looking for a quick "fix" for an infrequent issue, option 4
is probably easiest.  I wouldn't do that automatically / silently, because
you're losing data.

On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasire...@gmail.com> wrote:

> Hi,
>
> So, our Streaming Job fails with the following errors. If you see the
> errors
> below, they are all related to Kafka losing offsets and
> OffsetOutOfRangeException.
>
> What are the options we have other than fixing Kafka? We would like to do
> something like the following. How can we achieve 1 and 2 with Spark Kafka
> Direct?
>
> 1.Need to see a way to skip some offsets if they are not available after
> the
> max retries are reached..in that case there might be data loss.
>
> 2.Catch that exception and somehow force things to "reset" for that
> partition And how would it handle the offsets already calculated in the
> backlog (if there is one)?
>
> 3.Track the offsets separately, restart the job by providing the offsets.
>
> 4.Or a straightforward approach would be to monitor the log for this error,
> and if it occurs more than X times, kill the job, remove the checkpoint
> directory, and restart.
>
> ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([test_stream,5]))
>
>
>
> java.lang.ClassNotFoundException:
> kafka.common.NotLeaderForPartitionException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> java.util.concurrent.RejectedExecutionException: Task
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
> 12112]
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 10
> in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in stage
> 52.0 (TID 255, 172.16.97.97): UnknownReason
>
> Exception in thread "streaming-job-executor-0" java.lang.Error:
> java.lang.InterruptedException
>
> Caused by: java.lang.InterruptedException
>
> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 7
> in
> stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage 33.0
> (TID 283, 172.16.97.103): UnknownReason
>
> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-11-30 Thread SRK
Hi,

So, our Streaming Job fails with the following errors. If you see the errors
below, they are all related to Kafka losing offsets and
OffsetOutOfRangeException.

What are the options we have other than fixing Kafka? We would like to do
something like the following. How can we achieve 1 and 2 with Spark Kafka
Direct?

1.Need to see a way to skip some offsets if they are not available after the
max retries are reached..in that case there might be data loss.

2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

3.Track the offsets separately, restart the job by providing the offsets.

4.Or a straightforward approach would be to monitor the log for this error,
and if it occurs more than X times, kill the job, remove the checkpoint
directory, and restart.

ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([test_stream,5]))



java.lang.ClassNotFoundException:
kafka.common.NotLeaderForPartitionException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)



java.util.concurrent.RejectedExecutionException: Task
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
rejected from java.util.concurrent.ThreadPoolExecutor@543258e0[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
12112]



org.apache.spark.SparkException: Job aborted due to stage failure: Task 10
in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in stage
52.0 (TID 255, 172.16.97.97): UnknownReason

Exception in thread "streaming-job-executor-0" java.lang.Error:
java.lang.InterruptedException

Caused by: java.lang.InterruptedException

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)



org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in
stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage 33.0
(TID 283, 172.16.97.103): UnknownReason

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org