[
https://issues.apache.org/jira/browse/BEAM-10881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yws updated BEAM-10881:
-----------------------
Description:
*mycode:*
Read<byte[], byte[]> kafkaRead = KafkaIO.<byte[], byte[]> read()
.withBootstrapServers(brokers)
.withConsumerConfigUpdates(properties)
.withProcessingTime().withTopic(topic)
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class);
*after running in spark-runner about 2hours it thows
ConcurrentModificationException: stacktrace is as follows*
java.util.ConcurrentModificationException at
java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) at
java.util.ArrayList$Itr.next(ArrayList.java:851) at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:150)
at
org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
at
org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
at
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
at
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) at
org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1172)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1163)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1098) at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1163) at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:889)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
org.apache.spark.scheduler.Task.run(Task.scala:121) at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:442)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1386) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:448) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
was:
*mycode:*
Read<byte[], byte[]> kafkaRead = KafkaIO.<byte[], byte[]> read()
.withBootstrapServers(brokers)
.withConsumerConfigUpdates(properties)
.withProcessingTime().withTopic(topic)
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class);
*after running in spark-runner about 2hours it thows
ConcurrentModificationException: stacktrace is as follows*
java.util.ConcurrentModificationException at
java.util.ArrayList$Itr.checkForComodification([ArrayList.java:901|http://arraylist.java:901/])
at
java.util.ArrayList$[Itr.next|http://itr.next/]([ArrayList.java:851|http://arraylist.java:851/])
at
[org.apache.beam.vendor.guava.v26_0_jre.com.google|http://org.apache.beam.vendor.guava.v26_0_jre.com.google/].common.collect.Iterators$[2.next|http://2.next/]([Iterators.java:418|http://iterators.java:418/])
at
[org.apache.beam.sdk.io|http://org.apache.beam.sdk.io/].kafka.KafkaUnboundedReader.advance([KafkaUnboundedReader.java:150|http://kafkaunboundedreader.java:150/])
at
[org.apache.beam.runners.spark.io|http://org.apache.beam.runners.spark.io/].MicrobatchSource$Reader.advanceWithBackoff([MicrobatchSource.java:245|http://microbatchsource.java:245/])
at
[org.apache.beam.runners.spark.io|http://org.apache.beam.runners.spark.io/].MicrobatchSource$Reader.start([MicrobatchSource.java:223|http://microbatchsource.java:223/])
at
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply([StateSpecFunctions.java:168|http://statespecfunctions.java:168/])
at
org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply([StateSpecFunctions.java:107|http://statespecfunctions.java:107/])
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) at
org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) at
[org.apache.spark.storage|http://org.apache.spark.storage/].BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1172)
at
[org.apache.spark.storage|http://org.apache.spark.storage/].BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1163)
at
[org.apache.spark.storage|http://org.apache.spark.storage/].BlockManager.doPut(BlockManager.scala:1098)
at
[org.apache.spark.storage|http://org.apache.spark.storage/].BlockManager.doPutIterator(BlockManager.scala:1163)
at
[org.apache.spark.storage|http://org.apache.spark.storage/].BlockManager.getOrElseUpdate(BlockManager.scala:889)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
[org.apache.spark.scheduler.Task.run|http://org.apache.spark.scheduler.task.run/](Task.scala:121)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:442)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1386) at
org.apache.spark.executor.Executor$[TaskRunner.run|http://taskrunner.run/](Executor.scala:448)
at
java.util.concurrent.ThreadPoolExecutor.runWorker([ThreadPoolExecutor.java:1142|http://threadpoolexecutor.java:1142/])
at
java.util.concurrent.ThreadPoolExecutor$[Worker.run|http://worker.run/]([ThreadPoolExecutor.java:617|http://threadpoolexecutor.java:617/])
at
[java.lang.Thread.run|http://java.lang.thread.run/]([Thread.java:745|http://thread.java:745/])
> spark-runner kakfaIO beam throws ConcurrentModification after running 2 hours
> -----------------------------------------------------------------------------
>
> Key: BEAM-10881
> URL: https://issues.apache.org/jira/browse/BEAM-10881
> Project: Beam
> Issue Type: Bug
> Components: beam-community
> Affects Versions: 2.23.0
> Environment: spark-ruuner
> Reporter: yws
> Assignee: Aizhamal Nurmamat kyzy
> Priority: P0
>
> *mycode:*
> Read<byte[], byte[]> kafkaRead = KafkaIO.<byte[], byte[]> read()
> .withBootstrapServers(brokers)
> .withConsumerConfigUpdates(properties)
> .withProcessingTime().withTopic(topic)
> .withKeyDeserializer(ByteArrayDeserializer.class)
> .withValueDeserializer(ByteArrayDeserializer.class);
>
> *after running in spark-runner about 2hours it thows
> ConcurrentModificationException: stacktrace is as follows*
>
> java.util.ConcurrentModificationException at
> java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) at
> java.util.ArrayList$Itr.next(ArrayList.java:851) at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$2.next(Iterators.java:418)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:150)
> at
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:245)
> at
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advance(MicrobatchSource.java:232)
> at
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:177)
> at
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:107)
> at
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181) at
> org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180) at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) at
> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1172)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1163)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1098) at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1163)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:889)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
> org.apache.spark.scheduler.Task.run(Task.scala:121) at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:442)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1386) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:448) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)