[
https://issues.apache.org/jira/browse/BEAM-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305823#comment-16305823
]
Nicholas Verbeck commented on BEAM-3398:
----------------------------------------
Not sure where I'd be able to add Logging, without modifying the beam code
itself, since this is the Coder on the output from KafkaIO.read(). I'll see
about doing that though.
On another note. This appears to be in sync with events I've been trying to
trace down that result in a back log of batches being produced and most likely
being serialize to disk. So this could be an issue with he actual writing of
the data to HDFS and getting it back.
> KafkaRecordCoder.encode throws NullPointerException
> ---------------------------------------------------
>
> Key: BEAM-3398
> URL: https://issues.apache.org/jira/browse/BEAM-3398
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-extensions
> Affects Versions: 2.2.0
> Reporter: Nicholas Verbeck
> Assignee: Reuven Lax
>
> Pipelines dies due to NullPointerExceptions being produced within the
> KafkaRecordCoder. Pipeline was running stable and dies due to to many
> executor deaths. That seems to happen a lot and not sure the reason why the
> null is getting into the encoder in the first place.
> {code:java}
> 17/12/28 19:53:13 INFO AppInfoParser: Kafka version : 0.10.1.0
> 17/12/28 19:53:13 INFO AppInfoParser: Kafka commitId : 3402a74efb23d1d4
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521104
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521121
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521123
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521127
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521131
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521133
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521137
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521139
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521162
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521171
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521174
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521178
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521180
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521190
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521192
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521196
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521198
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521211
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521215
> 17/12/28 19:53:13 INFO KafkaIO: Reader-10: first record offset 40521217
> 17/12/28 19:53:13 ERROR Executor: Exception in task 10.0 in stage 558.0 (TID
> 8461)
> java.lang.NullPointerException
> at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:62)
> at
> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.encode(KafkaRecordCoder.java:36)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652)
> at
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599)
> at
> org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:55)
> at
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:175)
> at
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:105)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:275)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)