This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 076f7e47306 [SPARK-40495][SQL][TESTS] Add additional tests to
StreamingSessionWindowSuite
076f7e47306 is described below
commit 076f7e47306fa2a02b65ce279fb37b561abe441e
Author: Wei Liu <[email protected]>
AuthorDate: Thu Sep 29 17:34:48 2022 +0900
[SPARK-40495][SQL][TESTS] Add additional tests to
StreamingSessionWindowSuite
## What changes were proposed in this pull request?
Add complex tests to `StreamingSessionWindowSuite`. Concretely, I created
two helper functions,
- one is called `sessionWindowQueryNestedKey`, which would convert
`sessionId` from the single word key used in `sessionWindowQuery` to a nested
column key. For example: `"hello" -> (("hello", "hello"), "hello")`.
- The other is called `sessionWindowQueryMultiColKey`. It would convert
`sessionId` from the single word key used in `sessionWindowQuery` to two
columns. For example: "hello" -> col1: ("hello", "hello"), col2: "hello"
With the two new helper functions, I added more tests for the tests for
`complete mode` and `cap gap duration` (`append` and `async state` was not
included, because the first two is enough for testing the change I added). The
logic of the tests are not changed at all, just the key.
For the aggregation test (`session window - with more aggregation
functions`), I added some more functions as well as a UDAF to test, and I tried
`first()` and `last()` function on a nested triple, created using the same
method as the above.
## How was this patch tested?
All are tests.
Closes #37936 from WweiL/master.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../streaming/StreamingSessionWindowSuite.scala | 356 ++++++++++++++++++++-
1 file changed, 354 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSessionWindowSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSessionWindowSuite.scala
index cb1728a4c5a..25b7506178d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSessionWindowSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSessionWindowSuite.scala
@@ -23,10 +23,11 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.must.Matchers
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoder,
Encoders}
import org.apache.spark.sql.execution.streaming.MemoryStream
import
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider,
RocksDBStateStoreProvider}
-import org.apache.spark.sql.functions.{count, session_window, sum}
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
class StreamingSessionWindowSuite extends StreamTest
@@ -142,6 +143,126 @@ class StreamingSessionWindowSuite extends StreamTest
)
}
+ // Logic is the same as `complete mode - session window`
+ // just with a more complex key
+ testWithAllOptions("complete mode - session window - nested tuple key") {
+ val inputData = MemoryStream[(String, Long)]
+ val sessionUpdates = sessionWindowQueryNestedKey(inputData)
+
+ testStream(sessionUpdates, OutputMode.Complete())(
+ AddData(
+ inputData,
+ ("hello world spark streaming", 40L),
+ ("world hello structured streaming", 41L)),
+ CheckNewAnswer(
+ ((("hello", "hello"), "hello"), 40, 51, 11, 2),
+ ((("world", "world"), "world"), 40, 51, 11, 2),
+ ((("streaming", "streaming"), "streaming"), 40, 51, 11, 2),
+ ((("spark", "spark"), "spark"), 40, 50, 10, 1),
+ ((("structured", "structured"), "structured"), 41, 51, 10, 1)),
+ // placing new sessions "before" previous sessions
+ AddData(inputData, ("spark streaming", 25L)),
+ CheckNewAnswer(
+ ((("spark", "spark"), "spark"), 25, 35, 10, 1),
+ ((("streaming", "streaming"), "streaming"), 25, 35, 10, 1),
+ ((("hello", "hello"), "hello"), 40, 51, 11, 2),
+ ((("world", "world"), "world"), 40, 51, 11, 2),
+ ((("streaming", "streaming"), "streaming"), 40, 51, 11, 2),
+ ((("spark", "spark"), "spark"), 40, 50, 10, 1),
+ ((("structured", "structured"), "structured"), 41, 51, 10, 1)),
+ // concatenating multiple previous sessions into one
+ AddData(inputData, ("spark streaming", 30L)),
+ CheckNewAnswer(
+ ((("spark", "spark"), "spark"), 25, 50, 25, 3),
+ ((("streaming", "streaming"), "streaming"), 25, 51, 26, 4),
+ ((("hello", "hello"), "hello"), 40, 51, 11, 2),
+ ((("world", "world"), "world"), 40, 51, 11, 2),
+ ((("structured", "structured"), "structured"), 41, 51, 10, 1)),
+ // placing new sessions after previous sessions
+ AddData(inputData, ("hello apache spark", 60L)),
+ CheckNewAnswer(
+ ((("spark", "spark"), "spark"), 25, 50, 25, 3),
+ ((("streaming", "streaming"), "streaming"), 25, 51, 26, 4),
+ ((("hello", "hello"), "hello"), 40, 51, 11, 2),
+ ((("world", "world"), "world"), 40, 51, 11, 2),
+ ((("structured", "structured"), "structured"), 41, 51, 10, 1),
+ ((("hello", "hello"), "hello"), 60, 70, 10, 1),
+ ((("apache", "apache"), "apache"), 60, 70, 10, 1),
+ ((("spark", "spark"), "spark"), 60, 70, 10, 1)),
+ AddData(inputData, ("structured streaming", 90L)),
+ CheckNewAnswer(
+ ((("spark", "spark"), "spark"), 25, 50, 25, 3),
+ ((("streaming", "streaming"), "streaming"), 25, 51, 26, 4),
+ ((("hello", "hello"), "hello"), 40, 51, 11, 2),
+ ((("world", "world"), "world"), 40, 51, 11, 2),
+ ((("structured", "structured"), "structured"), 41, 51, 10, 1),
+ ((("hello", "hello"), "hello"), 60, 70, 10, 1),
+ ((("apache", "apache"), "apache"), 60, 70, 10, 1),
+ ((("spark", "spark"), "spark"), 60, 70, 10, 1),
+ ((("structured", "structured"), "structured"), 90, 100, 10, 1),
+ ((("streaming", "streaming"), "streaming"), 90, 100, 10, 1)))
+ }
+
+ // Logic is the same as `complete mode - session window`
+ // just with a more complex key
+ testWithAllOptions("complete mode - session window - multiple col key") {
+ val inputData = MemoryStream[(String, Long)]
+ val sessionUpdates = sessionWindowQueryMultiColKey(inputData)
+
+ testStream(sessionUpdates, OutputMode.Complete())(
+ AddData(
+ inputData,
+ ("hello world spark streaming", 40L),
+ ("world hello structured streaming", 41L)),
+ CheckNewAnswer(
+ (("hello", "hello"), "hello", 40, 51, 11, 2),
+ (("world", "world"), "world", 40, 51, 11, 2),
+ (("streaming", "streaming"), "streaming", 40, 51, 11, 2),
+ (("spark", "spark"), "spark", 40, 50, 10, 1),
+ (("structured", "structured"), "structured", 41, 51, 10, 1)),
+ // placing new sessions "before" previous sessions
+ AddData(inputData, ("spark streaming", 25L)),
+ CheckNewAnswer(
+ (("spark", "spark"), "spark", 25, 35, 10, 1),
+ (("streaming", "streaming"), "streaming", 25, 35, 10, 1),
+ (("hello", "hello"), "hello", 40, 51, 11, 2),
+ (("world", "world"), "world", 40, 51, 11, 2),
+ (("streaming", "streaming"), "streaming", 40, 51, 11, 2),
+ (("spark", "spark"), "spark", 40, 50, 10, 1),
+ (("structured", "structured"), "structured", 41, 51, 10, 1)),
+ // concatenating multiple previous sessions into one
+ AddData(inputData, ("spark streaming", 30L)),
+ CheckNewAnswer(
+ (("spark", "spark"), "spark", 25, 50, 25, 3),
+ (("streaming", "streaming"), "streaming", 25, 51, 26, 4),
+ (("hello", "hello"), "hello", 40, 51, 11, 2),
+ (("world", "world"), "world", 40, 51, 11, 2),
+ (("structured", "structured"), "structured", 41, 51, 10, 1)),
+ // placing new sessions after previous sessions
+ AddData(inputData, ("hello apache spark", 60L)),
+ CheckNewAnswer(
+ (("spark", "spark"), "spark", 25, 50, 25, 3),
+ (("streaming", "streaming"), "streaming", 25, 51, 26, 4),
+ (("hello", "hello"), "hello", 40, 51, 11, 2),
+ (("world", "world"), "world", 40, 51, 11, 2),
+ (("structured", "structured"), "structured", 41, 51, 10, 1),
+ (("hello", "hello"), "hello", 60, 70, 10, 1),
+ (("apache", "apache"), "apache", 60, 70, 10, 1),
+ (("spark", "spark"), "spark", 60, 70, 10, 1)),
+ AddData(inputData, ("structured streaming", 90L)),
+ CheckNewAnswer(
+ (("spark", "spark"), "spark", 25, 50, 25, 3),
+ (("streaming", "streaming"), "streaming", 25, 51, 26, 4),
+ (("hello", "hello"), "hello", 40, 51, 11, 2),
+ (("world", "world"), "world", 40, 51, 11, 2),
+ (("structured", "structured"), "structured", 41, 51, 10, 1),
+ (("hello", "hello"), "hello", 60, 70, 10, 1),
+ (("apache", "apache"), "apache", 60, 70, 10, 1),
+ (("spark", "spark"), "spark", 60, 70, 10, 1),
+ (("structured", "structured"), "structured", 90, 100, 10, 1),
+ (("streaming", "streaming"), "streaming", 90, 100, 10, 1)))
+ }
+
testWithAllOptions("complete mode - session window - no key") {
// complete mode doesn't honor watermark: even it is specified, watermark
will be
// always Unix timestamp 0
@@ -354,6 +475,107 @@ class StreamingSessionWindowSuite extends StreamTest
)
}
+ // Logic is the same as `SPARK-36465: dynamic gap duration`
+ // just with a more complex key
+ testWithAllOptions("dynamic gap duration - nested tuple key") {
+ val inputData = MemoryStream[(String, Long)]
+
+ val udf = spark.udf.register(
+ "gapDuration",
+ (s: ((String, String), String)) => {
+ if (s == (("hello", "hello"), "hello")) {
+ "1 second"
+ } else if (s == (("structured", "structured"), "structured")) {
+ // zero gap duration will be filtered out from aggregation
+ "0 second"
+ } else if (s == (("world", "world"), "world")) {
+ // negative gap duration will be filtered out from aggregation
+ "-10 seconds"
+ } else {
+ "10 seconds"
+ }
+ })
+
+ val sessionUpdates =
+ sessionWindowQueryNestedKey(inputData, session_window($"eventTime",
udf($"sessionId")))
+
+ testStream(sessionUpdates, OutputMode.Append())(
+ AddData(
+ inputData,
+ ("hello world spark streaming", 40L),
+ ("world hello structured streaming", 41L)),
+ CheckNewAnswer(),
+ // placing new sessions "before" previous sessions
+ AddData(inputData, ("spark streaming", 25L)),
+ CheckNewAnswer(),
+ // late event which session's end 10 would be later than watermark 11:
should be dropped
+ AddData(inputData, ("spark streaming", 0L)),
+ CheckNewAnswer(),
+ assertNumRowsDroppedByWatermark(2),
+ // concatenating multiple previous sessions into one
+ AddData(inputData, ("spark streaming", 30L)),
+ CheckNewAnswer(),
+ // placing new sessions after previous sessions
+ AddData(inputData, ("hello apache spark", 60L)),
+ CheckNewAnswer(),
+ AddData(inputData, ("structured streaming", 90L)),
+ CheckNewAnswer(
+ ((("spark", "spark"), "spark"), 25, 50, 25, 3),
+ ((("streaming", "streaming"), "streaming"), 25, 51, 26, 4),
+ ((("hello", "hello"), "hello"), 40, 42, 2, 2)))
+ }
+
+ // Logic is the same as `SPARK-36465: dynamic gap duration`
+ // just with a more complex key
+ testWithAllOptions("dynamic gap duration - multiple col key") {
+ val inputData = MemoryStream[(String, Long)]
+
+ val udf = spark.udf.register(
+ "gapDuration",
+ (s1: (String, String), s2: String) => {
+ if (s1 == ("hello", "hello") && s2 == "hello") {
+ "1 second"
+ } else if (s1 == ("structured", "structured") && s2 == "structured") {
+ // zero gap duration will be filtered out from aggregation
+ "0 second"
+ } else if (s1 == ("world", "world") && s2 == "world") {
+ // negative gap duration will be filtered out from aggregation
+ "-10 seconds"
+ } else {
+ "10 seconds"
+ }
+ })
+
+ val sessionUpdates = sessionWindowQueryMultiColKey(
+ inputData,
+ session_window($"eventTime", udf($"aggKeyDouble", $"aggKeySingle")))
+
+ testStream(sessionUpdates, OutputMode.Append())(
+ AddData(
+ inputData,
+ ("hello world spark streaming", 40L),
+ ("world hello structured streaming", 41L)),
+ CheckNewAnswer(),
+ // placing new sessions "before" previous sessions
+ AddData(inputData, ("spark streaming", 25L)),
+ CheckNewAnswer(),
+ // late event which session's end 10 would be later than watermark 11:
should be dropped
+ AddData(inputData, ("spark streaming", 0L)),
+ CheckNewAnswer(),
+ assertNumRowsDroppedByWatermark(2),
+ // concatenating multiple previous sessions into one
+ AddData(inputData, ("spark streaming", 30L)),
+ CheckNewAnswer(),
+ // placing new sessions after previous sessions
+ AddData(inputData, ("hello apache spark", 60L)),
+ CheckNewAnswer(),
+ AddData(inputData, ("structured streaming", 90L)),
+ CheckNewAnswer(
+ (("spark", "spark"), "spark", 25, 50, 25, 3),
+ (("streaming", "streaming"), "streaming", 25, 51, 26, 4),
+ (("hello", "hello"), "hello", 40, 42, 2, 2)))
+ }
+
testWithAllOptions("append mode - session window - no key") {
val inputData = MemoryStream[Int]
val windowedAggregation = sessionWindowQueryOnGlobalKey(inputData)
@@ -402,6 +624,76 @@ class StreamingSessionWindowSuite extends StreamTest
}
}
+ // Test that session window works with mean, median, std dev, variance, UDAFs
+ // and first, last on nested tuple keys
+ testWithAllOptions("session window - with more aggregation functions and
UDAFs") {
+ // create a trivial summation UDAF for test
+ // input type is a single Long
+ object MySum extends Aggregator[Long, Long, Long] {
+
+ def zero: Long = 0L
+
+ def reduce(buffer: Long, data: Long): Long = {
+ buffer + data
+ }
+
+ def merge(b1: Long, b2: Long): Long = {
+ b1 + b2
+ }
+
+ def finish(reduction: Long): Long = reduction
+
+ def bufferEncoder: Encoder[Long] = Encoders.scalaLong
+
+ def outputEncoder: Encoder[Long] = Encoders.scalaLong
+ }
+ val mySum = spark.udf.register("mySum", udaf(MySum))
+
+ val inputData = MemoryStream[(String, Long)]
+ val sessionWithAgg = {
+ // Split the lines into words, treat words as sessionId of events
+ val events = inputData.toDF()
+ .select($"_1".as("value"), $"_2".as("timestamp"))
+ .withColumn("eventTime", $"timestamp".cast("timestamp"))
+ .withColumn("single", $"timestamp")
+ .withColumn("double", struct($"single", $"single"))
+ .withColumn("triple", struct($"double", $"single"))
+ .withWatermark("eventTime", "30 seconds")
+ .selectExpr("explode(split(value, ' ')) AS sessionId",
+ "eventTime", "single", "triple")
+
+ val sessionWindow = session_window($"eventTime", "10 seconds")
+ events
+ .groupBy(sessionWindow.as("session"), $"sessionId")
+ .agg(
+ mean($"single").as("meanTime"),
+ median($"single").as("medianTime"),
+ stddev($"single").as("stdDevTime"),
+ variance($"single").as("varTime"),
+ first($"triple").as("firstTimeTriple"),
+ last($"triple").as("lastTimeTriple"),
+ mySum($"single").as("mySumSingle")
+ )
+ .selectExpr(
+ "sessionId", "CAST(session.start AS LONG)",
+ "CAST(session.end AS LONG)",
+ // "firstTimeTriple", "lastTimeTriple", // Non deterministic
+ "CAST(meanTime AS LONG)", "CAST(medianTime AS LONG)",
+ "CAST(stdDevTime AS LONG)", "CAST(varTime AS LONG)",
+ "mySumSingle")
+ }
+
+ testStream(sessionWithAgg, OutputMode.Append())(
+ AddData(inputData, ("a", 41L)),
+ AddData(inputData, ("a", 42L)),
+ AddData(inputData, ("a", 40L)),
+ CheckAnswer(),
+ AddData(inputData, ("b", 100L)), // Move the watermark past the end of
the session.
+ AddData(inputData, ("b", 101L)), // Trigger the session production.
+ CheckAnswer(("a", 40, 52, 41, 41, 1, 1, 123))
+ )
+ }
+
private def assertNumRowsDroppedByWatermark(
numRowsDroppedByWatermark: Long): AssertOnQuery = AssertOnQuery { q =>
q.processAllAvailable()
@@ -445,4 +737,64 @@ class StreamingSessionWindowSuite extends StreamTest
.select($"session".getField("start").cast("long").as[Long],
$"session".getField("end").cast("long").as[Long], $"count".as[Long],
$"sum".as[Long])
}
+
+ // Manipulate the input to create a nested tuple out of words and do
aggregation on the tuples.
+ // Split the lines into words, convert each word into a nested tuple,
+ // e.g. hello -> ((hello, hello), hello), treat each tuple as the sessionId
of a event,
+ // which get aggregated in the second clause.
+ private def sessionWindowQueryNestedKey(
+ input: MemoryStream[(String, Long)],
+ sessionWindow: Column = session_window($"eventTime", "10 seconds")):
DataFrame = {
+ val events = input
+ .toDF()
+ .select($"_1".as("value"), $"_2".as("timestamp"))
+ .withColumn("eventTime", $"timestamp".cast("timestamp"))
+ .withColumn("sessionIdSingle", split($"value", " "))
+ .withColumn("sessionIdDouble", arrays_zip($"sessionIdSingle",
$"sessionIdSingle"))
+ .withColumn("sessionIdTriple", arrays_zip($"sessionIdDouble",
$"sessionIdSingle"))
+ .withWatermark("eventTime", "30 seconds")
+ .selectExpr("explode(sessionIdTriple) AS sessionId", "eventTime")
+
+ events
+ .groupBy(sessionWindow.as("session"), $"sessionId")
+ .agg(count("*").as("numEvents"))
+ .selectExpr(
+ "sessionId",
+ "CAST(session.start AS LONG)",
+ "CAST(session.end AS LONG)",
+ "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS
durationMs",
+ "numEvents")
+ }
+
+ // Manipulate the input, for each word, create two columns and do
aggregation on both cols.
+ // Split the lines into words, convert each word into two columns,
+ // e.g. hello -> col1: (hello, hello), col2: hello. Aggregate on both
columns.
+ private def sessionWindowQueryMultiColKey(
+ input: MemoryStream[(String, Long)],
+ sessionWindow: Column = session_window($"eventTime", "10 seconds")):
DataFrame = {
+ val events = input
+ .toDF()
+ .select($"_1".as("value"), $"_2".as("timestamp"))
+ .withColumn("eventTime", $"timestamp".cast("timestamp"))
+ .withColumn("sessionIdSingle", split($"value", " "))
+ .withColumn("sessionIdDouble", arrays_zip($"sessionIdSingle",
$"sessionIdSingle"))
+ .withColumn("sessionIdTriple", arrays_zip($"sessionIdDouble",
$"sessionIdSingle"))
+ .withColumn("sessionIdTriple", explode($"sessionIdTriple"))
+ .withWatermark("eventTime", "30 seconds")
+ .selectExpr(
+ "sessionIdTriple.sessionIdDouble AS aggKeyDouble",
+ "sessionIdTriple.sessionIdSingle AS aggKeySingle",
+ "eventTime")
+
+ events
+ .groupBy(sessionWindow.as("session"), $"aggKeyDouble", $"aggKeySingle")
+ .agg(count("*").as("numEvents"))
+ .selectExpr(
+ "aggKeyDouble",
+ "aggKeySingle",
+ "CAST(session.start AS LONG)",
+ "CAST(session.end AS LONG)",
+ "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS
durationMs",
+ "numEvents")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]