[
https://issues.apache.org/jira/browse/SPARK-22163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-22163:
---------------------------------
Comment: was deleted
(was: Please stop doing this in JIRA ... I think
https://issues.apache.org/jira/browse/SPARK-22163?focusedCommentId=16195830&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16195830
explains enough. )
> Design Issue of Spark Streaming that Causes Random Run-time Exception
> ---------------------------------------------------------------------
>
> Key: SPARK-22163
> URL: https://issues.apache.org/jira/browse/SPARK-22163
> Project: Spark
> Issue Type: Bug
> Components: DStreams, Structured Streaming
> Affects Versions: 2.2.0
> Environment: Spark Streaming
> Kafka
> Linux
> Reporter: Michael N
>
> The application objects can contain List and can be modified dynamically as
> well. However, Spark Streaming framework asynchronously serializes the
> application's objects as the application runs. Therefore, it causes random
> run-time exception on the List when Spark Streaming framework happens to
> serializes the application's objects while the application modifies a List in
> its own object.
> In fact, there are multiple bugs reported about
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject
> that are permutation of the same root cause. So the design issue of Spark
> streaming framework is that it should do this serialization asynchronously.
> Instead, it should either
> 1. do this serialization synchronously. This is preferred to eliminate the
> issue completely. Or
> 2. Allow it to be configured per application whether to do this serialization
> synchronously or asynchronously, depending on the nature of each application.
> Also, Spark documentation should describe the conditions that trigger Spark
> to do this type of serialization asynchronously, so the applications can work
> around them until the fix is provided.
> ===
> Vadim Semenov and Steve Loughran, per your inquiries in ticket
> https://issues.apache.org/jira/browse/SPARK-21999, I am posting the reply
> here because this issue involves Spark's design and not necessarily its code
> implementation.
> —
> My application does not spin up its own thread. All the threads are
> controlled by Spark.
> Batch interval = 5 seconds
> Batch #3
> 1. Driver - Spark Thread #1 - starts batch #3 and blocks until all slave
> threads are done with this batch
> 2. Slave A - Spark Thread #2. Says it takes 10 seconds to complete
> 3. Slave B - Spark Thread #3. Says it takes 1 minutes to complete
> 4. Both thread #1 for the driver and thread 2# for Slave A do not jump ahead
> and process batch #4. Instead, they wait for thread #3 until it is done. =>
> So there is already synchronization among the threads within the same batch.
> Also, batch to batch is synchronous.
> 5. After Spark Thread #3 is done, the driver does other processing to finish
> the current batch. In my case, it updates a list of objects.
> The above steps repeat for the next batch #4 and subsequent batches.
> Based on the exception stack trace, it looks like in step 5, Spark has
> another thread #4 that serializes application objects asynchronously. So it
> causes random occurrences of ConcurrentModificationException, because the
> list of objects is being changed by Spark own thread #1 for the driver.
> So the issue is not that my application "is modifying a collection
> asynchronously w.r.t. Spark" as Sean kept claiming. Instead, it is Spark's
> asynchronous operations among its own different threads within the same batch
> that causes this issue.
> Since Spark controls all the threads and their synchronization, it is a Spark
> design's issue for the lack of synchronization between threads #1 and #4,
> that triggers ConcurrentModificationException. That is the root cause of
> this issue.
> Further, even if the application does not modify its list of objects, in step
> 5 the driver could be modifying multiple native objects say two integers. In
> thread #1 the driver could have updated integer X and before it could update
> integer Y, when Spark's thread #4 asynchronous serializes the application
> objects. So the persisted serialized data does not match with the actual
> data. This resulted in a permutation of this issue with a false positive
> condition where the serialized checkpoint data has partially correct data.
> One solution for both issues is to modify Spark's design and allow the
> serialization of application objects by Spark's thread #4 to be configurable
> per application to be either asynchronous or synchronous with Spark's thread
> #1. That way, it is up to individual applications to decide based on the
> nature of their business requirements and needed throughput.
> ===
> The code is listed below. Due to the asynchronous nature of Spark's thread
> operations and different hardware, the issue relating to this ticket occurs
> randomly. So you may need to tweak the batch duration.
> package test;
> {code}
> import java.util.HashMap;
> import java.util.HashSet;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.Iterator;
> import java.util.List;
> import java.util.Map;
> import java.util.Set;
> import java.util.regex.Pattern;
> import scala.Tuple2;
> import kafka.serializer.StringDecoder;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.function.*;
> import org.apache.spark.streaming.api.java.*;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import org.apache.spark.streaming.Durations;
> /**
> * Consumes messages from one or more topics in Kafka and does wordcount.
> * Usage: JavaDirectKafkaWordCount <brokers> <topics>
> * <brokers> is a list of one or more Kafka brokers
> * <topics> is a list of one or more kafka topics to consume from
> *
> * Example:
> * $ bin/run-example streaming.JavaDirectKafkaWordCount
> broker1-host:port,broker2-host:port \
> * topic1,topic2
> */
> // , VoidFunction<Iterator<Tuple2<String, Integer>>>
> public final class JavaDirectKafkaWordCount_Extended implements
> VoidFunction<JavaPairRDD<String, Integer>> {
>
> private static final Pattern SPACE = Pattern.compile(" ");
>
> private List<String> appStringList;
> public static void main(String[] args) throws Exception {
> try {
> if (args.length < 2) {
> System.err.println("Usage: JavaDirectKafkaWordCount <brokers>
> <topics>\n" +
> " <brokers> is a list of one or more Kafka brokers\n" +
> " <topics> is a list of one or more kafka topics to consume
> from\n\n");
> System.exit(1);
> }
>
> JavaDirectKafkaWordCount javaDirectKafkaWordCount = new
> JavaDirectKafkaWordCount ();
> javaDirectKafkaWordCount.setupStreamApp(args);
> } catch (Throwable exc) {
> exc.printStackTrace();
> }
> }
>
> private void setupStreamApp (String[] args) throws InterruptedException {
>
> // StreamingExamples.setStreamingLogLevels();
> String brokers = args[0];
> String topics = args[1];
> // create list of string with dummy values.
> appStringList = new ArrayList<>();
> for (int i = 0; i < 1000; ) {
> appStringList.add("a-"+ i++);
> }
>
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaDirectKafkaWordCount");
>
> // Create context with a 2 seconds batch interval
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(10));
> jssc.checkpoint("./test-checkpoint/");
>
> Set<String> topicsSet = new
> HashSet<>(Arrays.asList(topics.split(",")));
> Map<String, String> kafkaParams = new HashMap<>();
> kafkaParams.put("metadata.broker.list", brokers);
>
> // Create direct kafka stream with brokers and topics
> JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream(
> jssc,
> String.class,
> String.class,
> StringDecoder.class,
> StringDecoder.class,
> kafkaParams,
> topicsSet
> );
>
> // Get the lines, split them into words, count the words and print
> JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
> String>, String>() {
> /**
> *
> */
> private static final long serialVersionUID = 3769940753726592424L;
> @Override
> public String call(Tuple2<String, String> tuple2) {
> return tuple2._2();
> }
> });
>
> JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,
> String>() {
> @Override
> public Iterator<String> call(String x) {
> return Arrays.asList(SPACE.split(x)).iterator();
> }
> });
>
> JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> new PairFunction<String, String, Integer>() {
> @Override
> public Tuple2<String, Integer> call(String s) {
> return new Tuple2<>(s, 1);
> }
> }).reduceByKey(
> new Function2<Integer, Integer, Integer>() {
> @Override
> public Integer call(Integer i1, Integer i2) {
> return i1 + i2;
> }
> });
>
> wordCounts.foreachRDD(this);
>
> // Start the computation
> jssc.start();
> jssc.awaitTermination();
> }
> @Override
> public void call(JavaPairRDD<String, Integer> dataStream) throws
> Exception {
> System.out.println("start foreachRDD");
>
> dataStream.foreachPartition(new VoidFunction<Iterator<Tuple2<String,
> Integer>>> () {
> // Assuming that there are two slave threads, this
> foreachPartition code corresponds to Steps 2 and 3 for threads #2 and #3
> // as described in the high-level sequence described of the
> textual description above the code
> @Override
> public void call(Iterator<Tuple2<String, Integer>> tuples) throws
> Exception {
> if (tuples == null || !tuples.hasNext()) {
> return;
> }
>
> while (tuples.hasNext()) {
> // The step below is not related to the issue. It is used
> just to simulate some operation in the slave threads for completeness
> System.out.println(tuples.next()._1);
> }
> }
> });
> /* ===>
> * the steps below corresponds to Step 5 of the high-level sequence
> and Spark's thread #1 as described of the textual description above the code.
> *
> * These steps below are where ConcurrentModificationException
> occurs randomly as explained in Step 5 of the textual description of this
> ticket.
> * For the purpose of this test, these steps update the list by
> simply rotating the entries.
> *
> * Based on the stack trace, Spark has another thread, i,e, thread
> #4, that asynchronously serializes the application objects during the
> * the next three operations. So it would randomly encounters
> ConcurrentModificationException because Spark's thread #4 tries to serialize
> appStringList
> * while Spark's thread #1 is modifying the same list.
> */
> String tmp = appStringList.get(0);
> appStringList.remove(0);
> appStringList.add(tmp);
> System.out.println("end foreachRDD");
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]