Tathagata Das created SPARK-20374:
-------------------------------------

             Summary: Encoder generated using Java beans causes corruption in 
MapGroupsWithState
                 Key: SPARK-20374
                 URL: https://issues.apache.org/jira/browse/SPARK-20374
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.2.0
            Reporter: Tathagata Das


Running the example 
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
 gives incorrect output. Specifically, for every input word, it outputs the 
word twice in update mode, which is incorrect. 

This happens because in MapGroupsWithStateExec, when the timeout timestamp is 
written to the state row, it gets corrupdated. This leads to 
1. The state is updated, hence the word is output once. 
2. Later, when the timed out states are being processed, the same word is found 
again because of the corrupdated timeout timestamp. Therefore 

Ideally, the state row whose timeout timestamp was updated with T should never 
get caught in the search for timed out keys (i.e. timeout timestamp < T). But 
the corruption is returning a different timeout timestamp in the search.

Finally this must be a java bean encoder issue because the exact same query in 
the Scala example works fine - 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to