[
https://issues.apache.org/jira/browse/SPARK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun resolved SPARK-27340.
-----------------------------------
Fix Version/s: 3.0.0
Resolution: Fixed
Issue resolved by pull request 28326
[https://github.com/apache/spark/pull/28326]
> Alias on TimeWIndow expression may cause watermark metadata lost
> -----------------------------------------------------------------
>
> Key: SPARK-27340
> URL: https://issues.apache.org/jira/browse/SPARK-27340
> Project: Spark
> Issue Type: Bug
> Components: SQL, Structured Streaming
> Affects Versions: 2.4.0
> Reporter: Kevin Zhang
> Assignee: Yuanjian Li
> Priority: Major
> Fix For: 3.0.0
>
>
> When we use data api to write a structured streaming query job we usually
> specify a watermark on event time column. If we define a window on the event
> time column, the delayKey metadata of the event time column is supposed to be
> propagated to the new column generated by time window expression. But if we
> add additional alias on the time window column, the delayKey metadata is lost.
> Currently I only find the bug will affect stream-stream join with equal
> window join keys. In terms of aggregation, the gourping expression can be
> trimed(in CleanupAliases rule) so additional alias are removed and the
> metadata is kept.
> Here is an example:
> {code:scala}
> val sparkSession = SparkSession
> .builder()
> .master("local")
> .getOrCreate()
> val rateStream = sparkSession.readStream
> .format("rate")
> .option("rowsPerSecond", 10)
> .load()
> val fooStream = rateStream
> .select(
> col("value").as("fooId"),
> col("timestamp").as("fooTime")
> )
> .withWatermark("fooTime", "2 seconds")
> .select($"fooId", $"fooTime", window($"fooTime", "2
> seconds").alias("fooWindow"))
> val barStream = rateStream
> .where(col("value") % 2 === 0)
> .select(
> col("value").as("barId"),
> col("timestamp").as("barTime")
> )
> .withWatermark("barTime", "2 seconds")
> .select($"barId", $"barTime", window($"barTime", "2
> seconds").alias("barWindow"))
> val joinedDf = fooStream
> .join(
> barStream,
> $"fooId" === $"barId" &&
> fooStream.col("fooWindow") === barStream.col("barWindow"),
> joinType = "LeftOuter"
> )
> val query = joinedDf
> .writeStream
> .format("console")
> .option("truncate", 100)
> .trigger(Trigger.ProcessingTime("5 seconds"))
> .start()
> query.awaitTermination()
> {code}
> this program will end with an exception, and from the analyzed plan we can
> see there is no delayKey metadata on 'fooWindow'
> {code:java}
> org.apache.spark.sql.AnalysisException: Stream-stream outer join between two
> streaming DataFrame/Datasets is not supported without a watermark in the join
> keys, or a watermark on the nullable side and an appropriate range condition;;
> Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19))
> :- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9]
> : +- Filter isnotnull(fooTime#5-T2000ms)
> : +- Project [named_struct(start, precisetimestampconversion(((((CASE
> WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms,
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as
> double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType,
> LongType) - 0) as double) / cast(2000000 as double))) THEN
> (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType,
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint))
> ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType,
> LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as
> bigint)) - cast(1 as bigint)) * 2000000) + 0), LongType, TimestampType), end,
> precisetimestampconversion((((((CASE WHEN
> (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms,
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as
> double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType,
> LongType) - 0) as double) / cast(2000000 as double))) THEN
> (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType,
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint))
> ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType,
> LongType) - 0) as double) / cast(2000000 as double))) END + cast(0 as
> bigint)) - cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType,
> TimestampType)) AS window#10-T2000ms, fooId#4L, fooTime#5-T2000ms]
> : +- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds
> : +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
> : +- StreamingRelationV2
> org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7,
> rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
> +- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19]
> +- Filter isnotnull(barTime#15-T2000ms)
> +- Project [named_struct(start, precisetimestampconversion(((((CASE
> WHEN (cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms,
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as
> double) = (cast((precisetimestampconversion(barTime#15-T2000ms,
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN
> (CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType,
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint))
> ELSE CEIL((cast((precisetimestampconversion(barTime#15-T2000ms,
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END +
> cast(0 as bigint)) - cast(1 as bigint)) * 2000000) + 0), LongType,
> TimestampType), end, precisetimestampconversion((((((CASE WHEN
> (cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms,
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) as
> double) = (cast((precisetimestampconversion(barTime#15-T2000ms,
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) THEN
> (CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType,
> LongType) - 0) as double) / cast(2000000 as double))) + cast(1 as bigint))
> ELSE CEIL((cast((precisetimestampconversion(barTime#15-T2000ms,
> TimestampType, LongType) - 0) as double) / cast(2000000 as double))) END +
> cast(0 as bigint)) - cast(1 as bigint)) * 2000000) + 0) + 2000000), LongType,
> TimestampType)) AS window#20-T2000ms, barId#14L, barTime#15-T2000ms]
> +- EventTimeWatermark barTime#15: timestamp, interval 2 seconds
> +- Project [value#1L AS barId#14L, timestamp#0 AS barTime#15]
> +- Filter ((value#1L % cast(2 as bigint)) = cast(0 as bigint))
> +- StreamingRelationV2
> org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7,
> rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]