[ 
https://issues.apache.org/jira/browse/SPARK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-27340:
-------------------------------------

    Assignee: Yuanjian Li

> Alias on TimeWIndow expression may cause watermark metadata lost 
> -----------------------------------------------------------------
>
>                 Key: SPARK-27340
>                 URL: https://issues.apache.org/jira/browse/SPARK-27340
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Kevin Zhang
>            Assignee: Yuanjian Li
>            Priority: Major
>
> When we use data api to write a structured streaming query job we usually 
> specify a watermark on event time column. If we define a window on the event 
> time column, the delayKey metadata of the event time column is supposed to be 
> propagated to the new column generated by time window expression. But if we 
> add additional alias on the time window column, the delayKey metadata is lost.
> Currently I only find the bug will affect stream-stream join with equal 
> window join keys. In terms of aggregation, the gourping expression can be 
> trimed(in CleanupAliases rule) so additional alias are removed and the 
> metadata is kept.
> Here is an example:
> {code:scala}
>   val sparkSession = SparkSession
>     .builder()
>     .master("local")
>     .getOrCreate()
>   val rateStream = sparkSession.readStream
>     .format("rate")
>     .option("rowsPerSecond", 10)
>     .load()
>     val fooStream = rateStream
>       .select(
>         col("value").as("fooId"),
>         col("timestamp").as("fooTime")
>       )
>       .withWatermark("fooTime", "2 seconds")
>       .select($"fooId", $"fooTime", window($"fooTime", "2 
> seconds").alias("fooWindow"))
>     val barStream = rateStream
>       .where(col("value") % 2 === 0)
>       .select(
>         col("value").as("barId"),
>         col("timestamp").as("barTime")
>       )
>       .withWatermark("barTime", "2 seconds")
>       .select($"barId", $"barTime", window($"barTime", "2 
> seconds").alias("barWindow"))
>     val joinedDf = fooStream
>       .join(
>         barStream,
>         $"fooId" === $"barId" &&
>           fooStream.col("fooWindow") === barStream.col("barWindow"),
>         joinType = "LeftOuter"
>       )
>       val query = joinedDf
>       .writeStream
>       .format("console")
>       .option("truncate", 100)
>       .trigger(Trigger.ProcessingTime("5 seconds"))
>       .start()
>     query.awaitTermination()
> {code}
> this program will end with an exception, and from the analyzed plan we can 
> see there is no delayKey metadata on 'fooWindow'
> {code:java}
> org.apache.spark.sql.AnalysisException: Stream-stream outer join between two 
> streaming DataFrame/Datasets is not supported without a watermark in the join 
> keys, or a watermark on the nullable side and an appropriate range condition;;
> Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19))
> :- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9]
> :  +- Filter isnotnull(fooTime#5-T2000ms)
> :     +- Project [named_struct(start, precisetimestampconversion(((((CASE 
> WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as 
> double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as 
> bigint)) - cast(1 as bigint)) * 2000000) + 0), LongType, TimestampType), end, 
> precisetimestampconversion((((((CASE WHEN 
> (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as 
> double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as 
> bigint)) - cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType, 
> TimestampType)) AS window#10-T2000ms, fooId#4L, fooTime#5-T2000ms]
> :        +- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds
> :           +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
> :              +- StreamingRelationV2 
> org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, 
> rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
> +- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19]
>    +- Filter isnotnull(barTime#15-T2000ms)
>       +- Project [named_struct(start, precisetimestampconversion(((((CASE 
> WHEN (cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as 
> double) = (cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + 
> cast(0 as bigint)) - cast(1 as bigint)) * 2000000) + 0), LongType, 
> TimestampType), end, precisetimestampconversion((((((CASE WHEN 
> (cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as 
> double) = (cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN 
> (CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint)) 
> ELSE CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, 
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END + 
> cast(0 as bigint)) - cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType, 
> TimestampType)) AS window#20-T2000ms, barId#14L, barTime#15-T2000ms]
>          +- EventTimeWatermark barTime#15: timestamp, interval 2 seconds
>             +- Project [value#1L AS barId#14L, timestamp#0 AS barTime#15]
>                +- Filter ((value#1L % cast(2 as bigint)) = cast(0 as bigint))
>                   +- StreamingRelationV2 
> org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, 
> rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to