Nicholas Verbeck created BEAM-3259:
--------------------------------------
Summary: KafkaIO.read fails job upon broker death
Key: BEAM-3259
URL: https://issues.apache.org/jira/browse/BEAM-3259
Project: Beam
Issue Type: Bug
Components: sdk-java-extensions
Affects Versions: 2.1.0
Reporter: Nicholas Verbeck
Assignee: Reuven Lax
The KafkaIO.read() causes the job/pipeline to fail when a broker falls out of
the Kafka Cluster. I'd expect the job to continue running by sourcing the data
for the failed broker from one of the partition replicates.
Stacktrace of exception thrown:
{code:java}
17/11/27 19:41:16 WARN TaskSetManager: Lost task 8.0 in stage 269649.0 (TID
4044336, 96.118.131.19):
org.apache.beam.runners.spark.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalStateException: checkpointed partition PARTITION-38 and
assigned partition PARTITION-39 don't match
at
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
at
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
at
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
at
org.apache.beam.runners.spark.io.MicrobatchSource.getOrCreateReader(MicrobatchSource.java:131)
at
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:154)
at
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:105)
at
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at
org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: checkpointed partition PARTITION-38
and assigned partition PARTITION-39 don't match
at
shadded.com.google.common.base.Preconditions.checkState(Preconditions.java:737)
at
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.<init>(KafkaIO.java:1000)
at
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.createReader(KafkaIO.java:826)
at
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.createReader(KafkaIO.java:1)
at
org.apache.beam.runners.spark.io.MicrobatchSource$ReaderLoader.call(MicrobatchSource.java:312)
at
org.apache.beam.runners.spark.io.MicrobatchSource$ReaderLoader.call(MicrobatchSource.java:299)
at
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
at
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
at
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
at
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
at
org.apache.beam.runners.spark.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
... 28 more
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)