[ 
https://issues.apache.org/jira/browse/SPARK-22163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-22163:
------------------------------
    Description: 
The application objects can contain List and can be modified dynamically as 
well.   However, Spark Streaming framework asynchronously serializes the 
application's objects as the application runs.  Therefore, it causes random 
run-time exception on the List when Spark Streaming framework happens to 
serializes the application's objects while the application modifies a List in 
its own object.  

In fact, there are multiple bugs reported about

Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject

that are permutation of the same root cause. So the design issue of Spark 
streaming framework is that it should do this serialization asynchronously.  
Instead, it should either

1. do this serialization synchronously. This is preferred to eliminate the 
issue completely.  Or

2. Allow it to be configured per application whether to do this serialization 
synchronously or asynchronously, depending on the nature of each application.

Also, Spark documentation should describe the conditions that trigger Spark to 
do this type of serialization asynchronously, so the applications can work 
around them until the fix is provided. 

===

Vadim Semenov and Steve Loughran, per your inquiries in ticket 
https://issues.apache.org/jira/browse/SPARK-21999, I am posting the reply here 
because this issue involves Spark's design and not necessarily its code 
implementation.

—

My application does not spin up its own thread. All the threads are controlled 
by Spark.

Batch interval = 5 seconds

Batch #3
1. Driver - Spark Thread #1 - starts batch #3 and blocks until all slave 
threads are done with this batch
2. Slave A - Spark Thread #2. Says it takes 10 seconds to complete
3. Slave B - Spark Thread #3. Says it takes 1 minutes to complete

4. Both thread #1 for the driver and thread 2# for Slave A do not jump ahead 
and process batch #4. Instead, they wait for thread #3 until it is done. => So 
there is already synchronization among the threads within the same batch. Also, 
batch to batch is synchronous.

5. After Spark Thread #3 is done, the driver does other processing to finish 
the current batch. In my case, it updates a list of objects.

The above steps repeat for the next batch #4 and subsequent batches.

Based on the exception stack trace, it looks like in step 5, Spark has another 
thread #4 that serializes application objects asynchronously. So it causes 
random occurrences of ConcurrentModificationException, because the list of 
objects is being changed by Spark own thread #1 for the driver.

So the issue is not that my application "is modifying a collection 
asynchronously w.r.t. Spark" as Sean kept claiming. Instead, it is Spark's 
asynchronous operations among its own different threads within the same batch 
that causes this issue.

Since Spark controls all the threads and their synchronization, it is a Spark 
design's issue for the lack of synchronization between threads #1 and #4, that 
triggers ConcurrentModificationException.  That is the root cause of this issue.

Further, even if the application does not modify its list of objects, in step 5 
the driver could be modifying multiple native objects say two integers. In 
thread #1 the driver could have updated integer X and before it could update 
integer Y, when Spark's thread #4 asynchronous serializes the application 
objects. So the persisted serialized data does not match with the actual data. 
This resulted in a permutation of this issue with a false positive condition 
where the serialized checkpoint data has partially correct data.

One solution for both issues is to modify Spark's design and allow the 
serialization of application objects by Spark's thread #4 to be configurable 
per application to be either asynchronous or synchronous with Spark's thread 
#1. That way, it is up to individual applications to decide based on the nature 
of their business requirements and needed throughput.

===

The code is listed below. Due to the asynchronous nature of Spark's thread 
operations and different hardware, the issue relating to this ticket occurs 
randomly. So you may need to tweak the batch duration.

package test;

{code}
import java.util.HashMap;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import scala.Tuple2;

import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: JavaDirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 *    $ bin/run-example streaming.JavaDirectKafkaWordCount 
broker1-host:port,broker2-host:port \
 *      topic1,topic2
 */
//  ,    VoidFunction<Iterator<Tuple2<String, Integer>>>
public final class JavaDirectKafkaWordCount_Extended implements 
VoidFunction<JavaPairRDD<String, Integer>> {
   
  private static final Pattern SPACE = Pattern.compile(" ");
 
  private List<String> appStringList;

  public static void main(String[] args) throws Exception {
      try {
        if (args.length < 2) {
            System.err.println("Usage: JavaDirectKafkaWordCount <brokers> 
<topics>\n" +
                "  <brokers> is a list of one or more Kafka brokers\n" +
                "  <topics> is a list of one or more kafka topics to consume 
from\n\n");
            System.exit(1);           
          }
       
        JavaDirectKafkaWordCount javaDirectKafkaWordCount = new 
JavaDirectKafkaWordCount ();
        javaDirectKafkaWordCount.setupStreamApp(args);
      } catch (Throwable exc) {
          exc.printStackTrace();
      }
  }
 
  private void setupStreamApp (String[] args) throws InterruptedException {     
    // StreamingExamples.setStreamingLogLevels();

        String brokers = args[0];
        String topics = args[1];

        // create list of string with dummy values.
        appStringList = new ArrayList<>();
        for (int i = 0; i < 1000; ) {
            appStringList.add("a-"+ i++);           
        }
   
        SparkConf sparkConf = new 
SparkConf().setAppName("JavaDirectKafkaWordCount");
       
        // Create context with a 2 seconds batch interval
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(10));
        jssc.checkpoint("./test-checkpoint/");
   
        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers);
   
        // Create direct kafka stream with brokers and topics
        JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
            jssc,
            String.class,
            String.class,
            StringDecoder.class,
            StringDecoder.class,
            kafkaParams,
            topicsSet
        );
   
        // Get the lines, split them into words, count the words and print
        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
          /**
             *
             */
            private static final long serialVersionUID = 3769940753726592424L;

            @Override
              public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
              }
        });
       
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
              @Override
              public Iterator<String> call(String x) {
                return Arrays.asList(SPACE.split(x)).iterator();
              }
        });
       
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
              new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) {
                  return new Tuple2<>(s, 1);
                }
              }).reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer i1, Integer i2) {
                  return i1 + i2;
                }
        });
       
        wordCounts.foreachRDD(this);
               
        // Start the computation
        jssc.start();
        jssc.awaitTermination();
    }

    @Override
    public void call(JavaPairRDD<String, Integer> dataStream) throws Exception {

        System.out.println("start foreachRDD");
       
        dataStream.foreachPartition(new VoidFunction<Iterator<Tuple2<String, 
Integer>>> () {

            // Assuming that there are two slave threads, this foreachPartition 
code corresponds to Steps 2 and 3 for threads #2 and #3
            // as described in the high-level sequence described of the textual 
description above the code
            @Override
            public void call(Iterator<Tuple2<String, Integer>> tuples) throws 
Exception {
                if (tuples == null || !tuples.hasNext()) {
                    return;
                }
               
                while (tuples.hasNext()) {
                    // The step below is not related to the issue. It is used 
just to simulate some operation in the slave threads for completeness
                    System.out.println(tuples.next()._1);
                }                   
            }               
        });           

         /* ===> 
          *  the steps below corresponds to Step 5 of the high-level sequence 
and Spark's thread #1 as described of the textual description above the code.
          * 
          *  These steps below are where ConcurrentModificationException occurs 
randomly as explained in Step 5 of the textual description of this ticket.
          *  For the purpose of this test, these steps update the list by 
simply rotating the entries.
         *
         * Based on the stack trace, Spark has another thread, i,e, thread #4, 
that asynchronously serializes the application objects during the
         * the next three operations. So it would randomly encounters 
ConcurrentModificationException  because Spark's thread #4 tries to serialize 
appStringList
         * while Spark's thread #1 is modifying the same list.
         */
        String tmp = appStringList.get(0);
        appStringList.remove(0);
        appStringList.add(tmp);
        System.out.println("end foreachRDD");
    }   
}
{code}


  was:
The application objects can contain List and can be modified dynamically as 
well.   However, Spark Streaming framework asynchronously serializes the 
application's objects as the application runs.  Therefore, it causes random 
run-time exception on the List when Spark Streaming framework happens to 
serializes the application's objects while the application modifies a List in 
its own object.  

In fact, there are multiple bugs reported about

Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject

that are permutation of the same root cause. So the design issue of Spark 
streaming framework is that it should do this serialization asynchronously.  
Instead, it should either

1. do this serialization synchronously. This is preferred to eliminate the 
issue completely.  Or

2. Allow it to be configured per application whether to do this serialization 
synchronously or asynchronously, depending on the nature of each application.

Also, Spark documentation should describe the conditions that trigger Spark to 
do this type of serialization asynchronously, so the applications can work 
around them until the fix is provided. 

===

Vadim Semenov and Steve Loughran, per your inquiries in ticket 
https://issues.apache.org/jira/browse/SPARK-21999, I am posting the reply here 
because this issue involves Spark's design and not necessarily its code 
implementation.

—

My application does not spin up its own thread. All the threads are controlled 
by Spark.

Batch interval = 5 seconds

Batch #3
1. Driver - Spark Thread #1 - starts batch #3 and blocks until all slave 
threads are done with this batch
2. Slave A - Spark Thread #2. Says it takes 10 seconds to complete
3. Slave B - Spark Thread #3. Says it takes 1 minutes to complete

4. Both thread #1 for the driver and thread 2# for Slave A do not jump ahead 
and process batch #4. Instead, they wait for thread #3 until it is done. => So 
there is already synchronization among the threads within the same batch. Also, 
batch to batch is synchronous.

5. After Spark Thread #3 is done, the driver does other processing to finish 
the current batch. In my case, it updates a list of objects.

The above steps repeat for the next batch #4 and subsequent batches.

Based on the exception stack trace, it looks like in step 5, Spark has another 
thread #4 that serializes application objects asynchronously. So it causes 
random occurrences of ConcurrentModificationException, because the list of 
objects is being changed by Spark own thread #1 for the driver.

So the issue is not that my application "is modifying a collection 
asynchronously w.r.t. Spark" as Sean kept claiming. Instead, it is Spark's 
asynchronous operations among its own different threads within the same batch 
that causes this issue.

Since Spark controls all the threads and their synchronization, it is a Spark 
design's issue for the lack of synchronization between threads #1 and #4, that 
triggers ConcurrentModificationException.  That is the root cause of this issue.

Further, even if the application does not modify its list of objects, in step 5 
the driver could be modifying multiple native objects say two integers. In 
thread #1 the driver could have updated integer X and before it could update 
integer Y, when Spark's thread #4 asynchronous serializes the application 
objects. So the persisted serialized data does not match with the actual data. 
This resulted in a permutation of this issue with a false positive condition 
where the serialized checkpoint data has partially correct data.

One solution for both issues is to modify Spark's design and allow the 
serialization of application objects by Spark's thread #4 to be configurable 
per application to be either asynchronous or synchronous with Spark's thread 
#1. That way, it is up to individual applications to decide based on the nature 
of their business requirements and needed throughput.

===

The code is listed below. Due to the asynchronous nature of Spark's thread 
operations and different hardware, the issue relating to this ticket occurs 
randomly. So you may need to tweak the batch duration.

package test;


/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


import java.util.HashMap;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import scala.Tuple2;

import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: JavaDirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 *    $ bin/run-example streaming.JavaDirectKafkaWordCount 
broker1-host:port,broker2-host:port \
 *      topic1,topic2
 */
//  ,    VoidFunction<Iterator<Tuple2<String, Integer>>>
public final class JavaDirectKafkaWordCount_Extended implements 
VoidFunction<JavaPairRDD<String, Integer>> {
   
  private static final Pattern SPACE = Pattern.compile(" ");
 
  private List<String> appStringList;

  public static void main(String[] args) throws Exception {
      try {
        if (args.length < 2) {
            System.err.println("Usage: JavaDirectKafkaWordCount <brokers> 
<topics>\n" +
                "  <brokers> is a list of one or more Kafka brokers\n" +
                "  <topics> is a list of one or more kafka topics to consume 
from\n\n");
            System.exit(1);           
          }
       
        JavaDirectKafkaWordCount javaDirectKafkaWordCount = new 
JavaDirectKafkaWordCount ();
        javaDirectKafkaWordCount.setupStreamApp(args);
      } catch (Throwable exc) {
          exc.printStackTrace();
      }
  }
 
  private void setupStreamApp (String[] args) throws InterruptedException {     
    // StreamingExamples.setStreamingLogLevels();

        String brokers = args[0];
        String topics = args[1];

        // create list of string with dummy values.
        appStringList = new ArrayList<>();
        for (int i = 0; i < 1000; ) {
            appStringList.add("a-"+ i++);           
        }
   
        SparkConf sparkConf = new 
SparkConf().setAppName("JavaDirectKafkaWordCount");
       
        // Create context with a 2 seconds batch interval
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(10));
        jssc.checkpoint("./test-checkpoint/");
   
        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers);
   
        // Create direct kafka stream with brokers and topics
        JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
            jssc,
            String.class,
            String.class,
            StringDecoder.class,
            StringDecoder.class,
            kafkaParams,
            topicsSet
        );
   
        // Get the lines, split them into words, count the words and print
        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
          /**
             *
             */
            private static final long serialVersionUID = 3769940753726592424L;

            @Override
              public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
              }
        });
       
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
              @Override
              public Iterator<String> call(String x) {
                return Arrays.asList(SPACE.split(x)).iterator();
              }
        });
       
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
              new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) {
                  return new Tuple2<>(s, 1);
                }
              }).reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer i1, Integer i2) {
                  return i1 + i2;
                }
        });
       
        wordCounts.foreachRDD(this);
               
        // Start the computation
        jssc.start();
        jssc.awaitTermination();
    }

    @Override
    public void call(JavaPairRDD<String, Integer> dataStream) throws Exception {

        System.out.println("start foreachRDD");
       
        dataStream.foreachPartition(new VoidFunction<Iterator<Tuple2<String, 
Integer>>> () {

            // Assuming that there are two slave threads, this foreachPartition 
code corresponds to Steps 2 and 3 for threads #2 and #3
            // as described in the high-level sequence described of the textual 
description above the code
            @Override
            public void call(Iterator<Tuple2<String, Integer>> tuples) throws 
Exception {
                if (tuples == null || !tuples.hasNext()) {
                    return;
                }
               
                while (tuples.hasNext()) {
                    // The step below is not related to the issue. It is used 
just to simulate some operation in the slave threads for completeness
                    System.out.println(tuples.next()._1);
                }                   
            }               
        });           

         /* ===> 
          *  the steps below corresponds to Step 5 of the high-level sequence 
and Spark's thread #1 as described of the textual description above the code.
          * 
          *  These steps below are where ConcurrentModificationException occurs 
randomly as explained in Step 5 of the textual description of this ticket.
          *  For the purpose of this test, these steps update the list by 
simply rotating the entries.
         *
         * Based on the stack trace, Spark has another thread, i,e, thread #4, 
that asynchronously serializes the application objects during the
         * the next three operations. So it would randomly encounters 
ConcurrentModificationException  because Spark's thread #4 tries to serialize 
appStringList
         * while Spark's thread #1 is modifying the same list.
         */
        String tmp = appStringList.get(0);
        appStringList.remove(0);
        appStringList.add(tmp);
        System.out.println("end foreachRDD");
    }   
}


> Design Issue of Spark Streaming that Causes Random Run-time Exception
> ---------------------------------------------------------------------
>
>                 Key: SPARK-22163
>                 URL: https://issues.apache.org/jira/browse/SPARK-22163
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: Spark Streaming
> Kafka
> Linux
>            Reporter: Michael N
>
> The application objects can contain List and can be modified dynamically as 
> well.   However, Spark Streaming framework asynchronously serializes the 
> application's objects as the application runs.  Therefore, it causes random 
> run-time exception on the List when Spark Streaming framework happens to 
> serializes the application's objects while the application modifies a List in 
> its own object.  
> In fact, there are multiple bugs reported about
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject
> that are permutation of the same root cause. So the design issue of Spark 
> streaming framework is that it should do this serialization asynchronously.  
> Instead, it should either
> 1. do this serialization synchronously. This is preferred to eliminate the 
> issue completely.  Or
> 2. Allow it to be configured per application whether to do this serialization 
> synchronously or asynchronously, depending on the nature of each application.
> Also, Spark documentation should describe the conditions that trigger Spark 
> to do this type of serialization asynchronously, so the applications can work 
> around them until the fix is provided. 
> ===
> Vadim Semenov and Steve Loughran, per your inquiries in ticket 
> https://issues.apache.org/jira/browse/SPARK-21999, I am posting the reply 
> here because this issue involves Spark's design and not necessarily its code 
> implementation.
> —
> My application does not spin up its own thread. All the threads are 
> controlled by Spark.
> Batch interval = 5 seconds
> Batch #3
> 1. Driver - Spark Thread #1 - starts batch #3 and blocks until all slave 
> threads are done with this batch
> 2. Slave A - Spark Thread #2. Says it takes 10 seconds to complete
> 3. Slave B - Spark Thread #3. Says it takes 1 minutes to complete
> 4. Both thread #1 for the driver and thread 2# for Slave A do not jump ahead 
> and process batch #4. Instead, they wait for thread #3 until it is done. => 
> So there is already synchronization among the threads within the same batch. 
> Also, batch to batch is synchronous.
> 5. After Spark Thread #3 is done, the driver does other processing to finish 
> the current batch. In my case, it updates a list of objects.
> The above steps repeat for the next batch #4 and subsequent batches.
> Based on the exception stack trace, it looks like in step 5, Spark has 
> another thread #4 that serializes application objects asynchronously. So it 
> causes random occurrences of ConcurrentModificationException, because the 
> list of objects is being changed by Spark own thread #1 for the driver.
> So the issue is not that my application "is modifying a collection 
> asynchronously w.r.t. Spark" as Sean kept claiming. Instead, it is Spark's 
> asynchronous operations among its own different threads within the same batch 
> that causes this issue.
> Since Spark controls all the threads and their synchronization, it is a Spark 
> design's issue for the lack of synchronization between threads #1 and #4, 
> that triggers ConcurrentModificationException.  That is the root cause of 
> this issue.
> Further, even if the application does not modify its list of objects, in step 
> 5 the driver could be modifying multiple native objects say two integers. In 
> thread #1 the driver could have updated integer X and before it could update 
> integer Y, when Spark's thread #4 asynchronous serializes the application 
> objects. So the persisted serialized data does not match with the actual 
> data. This resulted in a permutation of this issue with a false positive 
> condition where the serialized checkpoint data has partially correct data.
> One solution for both issues is to modify Spark's design and allow the 
> serialization of application objects by Spark's thread #4 to be configurable 
> per application to be either asynchronous or synchronous with Spark's thread 
> #1. That way, it is up to individual applications to decide based on the 
> nature of their business requirements and needed throughput.
> ===
> The code is listed below. Due to the asynchronous nature of Spark's thread 
> operations and different hardware, the issue relating to this ticket occurs 
> randomly. So you may need to tweak the batch duration.
> package test;
> {code}
> import java.util.HashMap;
> import java.util.HashSet;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.Iterator;
> import java.util.List;
> import java.util.Map;
> import java.util.Set;
> import java.util.regex.Pattern;
> import scala.Tuple2;
> import kafka.serializer.StringDecoder;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.function.*;
> import org.apache.spark.streaming.api.java.*;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import org.apache.spark.streaming.Durations;
> /**
>  * Consumes messages from one or more topics in Kafka and does wordcount.
>  * Usage: JavaDirectKafkaWordCount <brokers> <topics>
>  *   <brokers> is a list of one or more Kafka brokers
>  *   <topics> is a list of one or more kafka topics to consume from
>  *
>  * Example:
>  *    $ bin/run-example streaming.JavaDirectKafkaWordCount 
> broker1-host:port,broker2-host:port \
>  *      topic1,topic2
>  */
> //  ,    VoidFunction<Iterator<Tuple2<String, Integer>>>
> public final class JavaDirectKafkaWordCount_Extended implements 
> VoidFunction<JavaPairRDD<String, Integer>> {
>    
>   private static final Pattern SPACE = Pattern.compile(" ");
>  
>   private List<String> appStringList;
>   public static void main(String[] args) throws Exception {
>       try {
>         if (args.length < 2) {
>             System.err.println("Usage: JavaDirectKafkaWordCount <brokers> 
> <topics>\n" +
>                 "  <brokers> is a list of one or more Kafka brokers\n" +
>                 "  <topics> is a list of one or more kafka topics to consume 
> from\n\n");
>             System.exit(1);           
>           }
>        
>         JavaDirectKafkaWordCount javaDirectKafkaWordCount = new 
> JavaDirectKafkaWordCount ();
>         javaDirectKafkaWordCount.setupStreamApp(args);
>       } catch (Throwable exc) {
>           exc.printStackTrace();
>       }
>   }
>  
>   private void setupStreamApp (String[] args) throws InterruptedException {   
>   
>     // StreamingExamples.setStreamingLogLevels();
>         String brokers = args[0];
>         String topics = args[1];
>         // create list of string with dummy values.
>         appStringList = new ArrayList<>();
>         for (int i = 0; i < 1000; ) {
>             appStringList.add("a-"+ i++);           
>         }
>    
>         SparkConf sparkConf = new 
> SparkConf().setAppName("JavaDirectKafkaWordCount");
>        
>         // Create context with a 2 seconds batch interval
>         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
> Durations.seconds(10));
>         jssc.checkpoint("./test-checkpoint/");
>    
>         Set<String> topicsSet = new 
> HashSet<>(Arrays.asList(topics.split(",")));
>         Map<String, String> kafkaParams = new HashMap<>();
>         kafkaParams.put("metadata.broker.list", brokers);
>    
>         // Create direct kafka stream with brokers and topics
>         JavaPairInputDStream<String, String> messages = 
> KafkaUtils.createDirectStream(
>             jssc,
>             String.class,
>             String.class,
>             StringDecoder.class,
>             StringDecoder.class,
>             kafkaParams,
>             topicsSet
>         );
>    
>         // Get the lines, split them into words, count the words and print
>         JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
> String>, String>() {
>           /**
>              *
>              */
>             private static final long serialVersionUID = 3769940753726592424L;
>             @Override
>               public String call(Tuple2<String, String> tuple2) {
>                 return tuple2._2();
>               }
>         });
>        
>         JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
> String>() {
>               @Override
>               public Iterator<String> call(String x) {
>                 return Arrays.asList(SPACE.split(x)).iterator();
>               }
>         });
>        
>         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>               new PairFunction<String, String, Integer>() {
>                 @Override
>                 public Tuple2<String, Integer> call(String s) {
>                   return new Tuple2<>(s, 1);
>                 }
>               }).reduceByKey(
>                 new Function2<Integer, Integer, Integer>() {
>                 @Override
>                 public Integer call(Integer i1, Integer i2) {
>                   return i1 + i2;
>                 }
>         });
>        
>         wordCounts.foreachRDD(this);
>                
>         // Start the computation
>         jssc.start();
>         jssc.awaitTermination();
>     }
>     @Override
>     public void call(JavaPairRDD<String, Integer> dataStream) throws 
> Exception {
>         System.out.println("start foreachRDD");
>        
>         dataStream.foreachPartition(new VoidFunction<Iterator<Tuple2<String, 
> Integer>>> () {
>             // Assuming that there are two slave threads, this 
> foreachPartition code corresponds to Steps 2 and 3 for threads #2 and #3
>             // as described in the high-level sequence described of the 
> textual description above the code
>             @Override
>             public void call(Iterator<Tuple2<String, Integer>> tuples) throws 
> Exception {
>                 if (tuples == null || !tuples.hasNext()) {
>                     return;
>                 }
>                
>                 while (tuples.hasNext()) {
>                     // The step below is not related to the issue. It is used 
> just to simulate some operation in the slave threads for completeness
>                     System.out.println(tuples.next()._1);
>                 }                   
>             }               
>         });           
>          /* ===> 
>           *  the steps below corresponds to Step 5 of the high-level sequence 
> and Spark's thread #1 as described of the textual description above the code.
>           * 
>           *  These steps below are where ConcurrentModificationException 
> occurs randomly as explained in Step 5 of the textual description of this 
> ticket.
>           *  For the purpose of this test, these steps update the list by 
> simply rotating the entries.
>          *
>          * Based on the stack trace, Spark has another thread, i,e, thread 
> #4, that asynchronously serializes the application objects during the
>          * the next three operations. So it would randomly encounters 
> ConcurrentModificationException  because Spark's thread #4 tries to serialize 
> appStringList
>          * while Spark's thread #1 is modifying the same list.
>          */
>         String tmp = appStringList.get(0);
>         appStringList.remove(0);
>         appStringList.add(tmp);
>         System.out.println("end foreachRDD");
>     }   
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to