Nicholas Verbeck created BEAM-3259:
--------------------------------------

             Summary: KafkaIO.read fails job upon broker death
                 Key: BEAM-3259
                 URL: https://issues.apache.org/jira/browse/BEAM-3259
             Project: Beam
          Issue Type: Bug
          Components: sdk-java-extensions
    Affects Versions: 2.1.0
            Reporter: Nicholas Verbeck
            Assignee: Reuven Lax


The KafkaIO.read() causes the job/pipeline to fail when a broker falls out of 
the Kafka Cluster. I'd expect the job to continue running by sourcing the data 
for the failed broker from one of the partition replicates. 


Stacktrace of exception thrown:
{code:java}
17/11/27 19:41:16 WARN TaskSetManager: Lost task 8.0 in stage 269649.0 (TID 
4044336, 96.118.131.19): 
org.apache.beam.runners.spark.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalStateException: checkpointed partition PARTITION-38 and 
assigned partition PARTITION-39 don't match
        at 
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
        at 
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        at 
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
        at 
org.apache.beam.runners.spark.io.MicrobatchSource.getOrCreateReader(MicrobatchSource.java:131)
        at 
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:154)
        at 
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:105)
        at 
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
        at 
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
        at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: checkpointed partition PARTITION-38 
and assigned partition PARTITION-39 don't match
        at 
shadded.com.google.common.base.Preconditions.checkState(Preconditions.java:737)
        at 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.<init>(KafkaIO.java:1000)
        at 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.createReader(KafkaIO.java:826)
        at 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.createReader(KafkaIO.java:1)
        at 
org.apache.beam.runners.spark.io.MicrobatchSource$ReaderLoader.call(MicrobatchSource.java:312)
        at 
org.apache.beam.runners.spark.io.MicrobatchSource$ReaderLoader.call(MicrobatchSource.java:299)
        at 
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
        at 
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
        at 
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
        at 
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
        at 
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
        ... 28 more
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to