[ 
https://issues.apache.org/jira/browse/SPARK-22163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16193907#comment-16193907
 ] 

Michael N edited comment on SPARK-22163 at 10/6/17 12:18 AM:
-------------------------------------------------------------

Vadim Semenov and Steve Loughran, per your inquiries in ticket 
https://issues.apache.org/jira/browse/SPARK-21999, I am posting the reply here 
because this issue involves Spark's design and not necessarily its code 
implementation.

---

My application does not spin up its own thread. All the threads are controlled 
by Spark.

Batch interval = 5 seconds

Batch #3
1. Driver - Spark Thread #1 - starts batch #3 and blocks until all slave 
threads are done with this batch
2. Slave A - Spark Thread #2 takes 10 seconds to complete
3. Slave B - Spark Thread #3 takes 1 minutes to complete

4. Both thread 1 for the driver and thread 2 for Slave A do not jump ahead and 
process batch #4. Instead, they synchronize with  thread B until it is done.  
=> So there is synchronization among the threads within the same batch, and 
also batch to batch is synchronous.

5. After Spark Thread #3 is done, the driver does other processing to finish 
the current batch.  In my case, it updates a list of objects.

The above steps repeat for the next batch #4 and subsequent batches.

Based on the exception stack trace, it looks like in step 5, Spark has another 
thread #4 that serializes application objects asynchronously, so it causes 
random occurrences of ConcurrentModificationException, because the list of 
objects is being changed by Spark Thread #1 for the driver.

So the issue is not that my application "is modifying a collection 
asynchronously w.r.t. Spark" as Sean kept claiming. It is Spark's asynchronous 
operations among its own different threads within the same batch.

I understand Spark needs to serializes objects for check point purposes. 
However, since Spark controls all the threads and their synchronization, it is 
a Spark design's issue for the lack of synchronization between threads #1 and 
#4, that triggers ConcurrentModificationException. 

Further, even if the application does not modify its list of objects, in step 5 
the driver could be modifying multiple native objects say two integers.  In 
thread #1 the driver could have updated integer X and before it could update 
integer Y,  when Spark's  thread #4  asynchronous serializes the application 
objects. So the persisted serialized data does not match with the actual data.  
This resulted in a permutation of this issue with a false positive condition 
where the serialized checkpoint data has partially correct data.

One solution for both issues is to modify Spark's design and allow the 
serialization of application objects by Spark's  thread #4 to be configurable 
per application to be either asynchronous or synchronous with Spark's thread 
#1.  That way, it is up to individual applications to decide based on the 
nature of their business requirements and needed throughput.



was (Author: michaeln_apache):
Vadim Semenov and Steve Loughran, per your inquiries in ticket 
https://issues.apache.org/jira/browse/SPARK-21999, I am posting the reply here 
because this issue involves Spark's design and not necessarily its code 
implementation.

---

My application does not spin up its own thread. All the threads are controlled 
by Spark.

Batch interval = 5 seconds

Batch #3
1. Driver - Spark Thread #1 - starts batch #3 and blocks until all slave 
threads are done with this batch
2. Slave A - Spark Thread #2 takes 10 seconds to complete
3. Slave B - Spark Thread #3 takes 1 minutes to complete

4. Both thread 1 for the driver and thread 2 for Slave A do not jump ahead and 
process batch #4. Instead, they synchronize with  thread B until it is done.  
=> So there is synchronization among the threads within the same batch, and 
also batch to batch is synchronous.

5. After Spark Thread #3 is done, the driver does other processing to finish 
the current batch.  In my case, it updates a list of objects.

The above steps repeat for the next batch #4 and subsequent batches.

Based on the exception stack trace, it looks like in step 5, Spark has another 
thread #4 that serializes application objects asynchronously, so it causes 
random occurrences of ConcurrentModificationException, because the list of 
objects is being changed by Spark Thread #1 for the driver.

So the issue is not that my application "is modifying a collection 
asynchronously w.r.t. Spark" as Sean kept claiming. It is Spark's asynchronous 
operations among its own different threads within the same batch.

I understand Spark needs to serializes objects for check point purposes. 
However, since Spark controls all the threads and their synchronization, it is 
a Spark design's issue for the lack of synchronization between threads #1 and 
#4, that triggers ConcurrentModificationException. 

Further, even if the application does not modify its list of objects, in step 5 
the driver could be modifying multiple native objects say two integers.  in 
thread #1 the driver could have updated integer X and before it could update 
integer Y,  when Spark's  thread #4  asynchronous serializes the application 
objects. So the persisted serialized data does not match with the actual data.  
This resulted in a permutation of this issue of false positive condition where 
the serialized checkpoint data has partial correct data.

One solution for both issues is the modify Spark's design to allow the 
serialization of application objects by Spark's  thread #4 to be configurable 
per application to be either asynchronous or synchronous with Spark's thread 
#1.  That way, it is up to individual applications to decide based on the 
nature of their business requirements and needed throughput.


> Design Issue of Spark Streaming that Causes Random Run-time Exception
> ---------------------------------------------------------------------
>
>                 Key: SPARK-22163
>                 URL: https://issues.apache.org/jira/browse/SPARK-22163
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: Spark Streaming
> Kafka
> Linux
>            Reporter: Michael N
>
> The application objects can contain List and can be modified dynamically as 
> well.   However, Spark Streaming framework asynchronously serializes the 
> application's objects as the application runs.  Therefore, it causes random 
> run-time exception on the List when Spark Streaming framework happens to 
> serializes the application's objects while the application modifies a List in 
> its own object.  
> In fact, there are multiple bugs reported about
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject
> that are permutation of the same root cause. So the design issue of Spark 
> streaming framework is that it should do this serialization asynchronously.  
> Instead, it should either
> 1. do this serialization synchronously. This is preferred to eliminate the 
> issue completely.  Or
> 2. Allow it to be configured per application whether to do this serialization 
> synchronously or asynchronously, depending on the nature of each application.
> Also, Spark documentation should describe the conditions that trigger Spark 
> to do this type of serialization asynchronously, so the applications can work 
> around them until the fix is provided. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to