[ 
https://issues.apache.org/jira/browse/SPARK-20374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-20374:
----------------------------------
    Description: 
Running the example 
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
 gives incorrect output. Specifically, for every input word, it outputs the 
word twice in update mode, which is incorrect. 

This happens because in FlatMapGroupsWithStateExec, when the timeout timestamp 
is written to the state row, it gets corrupdated. This leads to 
1. The state is updated, hence the word is output once. 
2. Later, when the timed out states are being processed, the same word is found 
again because of the corrupdated timeout timestamp. Therefore 

Ideally, the state row whose timeout timestamp was updated with T should never 
get caught in the search for timed out keys (i.e. timeout timestamp < T). But 
the corruption is returning a different timeout timestamp in the search.

Finally this must be a java bean encoder issue because the exact same query in 
the Scala example works fine - 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

  was:
Running the example 
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
 gives incorrect output. Specifically, for every input word, it outputs the 
word twice in update mode, which is incorrect. 

This happens because in MapGroupsWithStateExec, when the timeout timestamp is 
written to the state row, it gets corrupdated. This leads to 
1. The state is updated, hence the word is output once. 
2. Later, when the timed out states are being processed, the same word is found 
again because of the corrupdated timeout timestamp. Therefore 

Ideally, the state row whose timeout timestamp was updated with T should never 
get caught in the search for timed out keys (i.e. timeout timestamp < T). But 
the corruption is returning a different timeout timestamp in the search.

Finally this must be a java bean encoder issue because the exact same query in 
the Scala example works fine - 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala


> Encoder generated using Java beans causes corruption in MapGroupsWithState
> --------------------------------------------------------------------------
>
>                 Key: SPARK-20374
>                 URL: https://issues.apache.org/jira/browse/SPARK-20374
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Tathagata Das
>
> Running the example 
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>  gives incorrect output. Specifically, for every input word, it outputs the 
> word twice in update mode, which is incorrect. 
> This happens because in FlatMapGroupsWithStateExec, when the timeout 
> timestamp is written to the state row, it gets corrupdated. This leads to 
> 1. The state is updated, hence the word is output once. 
> 2. Later, when the timed out states are being processed, the same word is 
> found again because of the corrupdated timeout timestamp. Therefore 
> Ideally, the state row whose timeout timestamp was updated with T should 
> never get caught in the search for timed out keys (i.e. timeout timestamp < 
> T). But the corruption is returning a different timeout timestamp in the 
> search.
> Finally this must be a java bean encoder issue because the exact same query 
> in the Scala example works fine - 
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to