Burak Yavuz created SPARK-18339:
-----------------------------------

             Summary: Don't push down current_timestamp for filters in 
StructuredStreaming
                 Key: SPARK-18339
                 URL: https://issues.apache.org/jira/browse/SPARK-18339
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.0.1
            Reporter: Burak Yavuz


For the following workflow:
1. I have a column called time which is at minute level precision in a 
Streaming DataFrame
2. I want to perform groupBy time, count
3. Then I want my MemorySink to only have the last 30 minutes of counts and I 
perform this by
{code}
.where('time >= current_timestamp().cast("long") - 30 * 60)
{code}

what happens is that the `filter` gets pushed down before the aggregation, and 
the filter happens on the source data for the aggregation instead of the result 
of the aggregation (where I actually want to filter).
I guess the main issue here is that `current_timestamp` is non-deterministic in 
the streaming context and shouldn't be pushed down the filter.

Does this require us to store the `current_timestamp` for each trigger of the 
streaming job, that is something to discuss.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to