This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 076f7e47306 [SPARK-40495][SQL][TESTS] Add additional tests to 
StreamingSessionWindowSuite
076f7e47306 is described below

commit 076f7e47306fa2a02b65ce279fb37b561abe441e
Author: Wei Liu <[email protected]>
AuthorDate: Thu Sep 29 17:34:48 2022 +0900

    [SPARK-40495][SQL][TESTS] Add additional tests to 
StreamingSessionWindowSuite
    
    ## What changes were proposed in this pull request?
    Add complex tests to `StreamingSessionWindowSuite`. Concretely, I created 
two helper functions,
    - one is called `sessionWindowQueryNestedKey`, which would convert 
`sessionId` from the single word key used in `sessionWindowQuery` to a nested 
column key. For example: `"hello" -> (("hello", "hello"), "hello")`.
    - The other is called `sessionWindowQueryMultiColKey`. It would convert 
`sessionId` from the single word key used in `sessionWindowQuery` to two 
columns. For example: "hello" -> col1: ("hello", "hello"), col2: "hello"
    
    With the two new helper functions, I added more tests for the tests for 
`complete mode` and `cap gap duration` (`append` and `async state` was not 
included, because the first two is enough for testing the change I added). The 
logic of the tests are not changed at all, just the key.
    
    For the aggregation test (`session window - with more aggregation 
functions`), I added some more functions as well as a UDAF to test, and I tried 
`first()` and `last()` function on a nested triple, created using the same 
method as the above.
    
    ## How was this patch tested?
    All are tests.
    
    Closes #37936 from WweiL/master.
    
    Authored-by: Wei Liu <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../streaming/StreamingSessionWindowSuite.scala    | 356 ++++++++++++++++++++-
 1 file changed, 354 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSessionWindowSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSessionWindowSuite.scala
index cb1728a4c5a..25b7506178d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSessionWindowSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSessionWindowSuite.scala
@@ -23,10 +23,11 @@ import org.scalatest.BeforeAndAfter
 import org.scalatest.matchers.must.Matchers
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoder, 
Encoders}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import 
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, 
RocksDBStateStoreProvider}
-import org.apache.spark.sql.functions.{count, session_window, sum}
+import org.apache.spark.sql.expressions.Aggregator
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 
 class StreamingSessionWindowSuite extends StreamTest
@@ -142,6 +143,126 @@ class StreamingSessionWindowSuite extends StreamTest
     )
   }
 
+  // Logic is the same as `complete mode - session window`
+  // just with a more complex key
+  testWithAllOptions("complete mode - session window - nested tuple key") {
+    val inputData = MemoryStream[(String, Long)]
+    val sessionUpdates = sessionWindowQueryNestedKey(inputData)
+
+    testStream(sessionUpdates, OutputMode.Complete())(
+      AddData(
+        inputData,
+        ("hello world spark streaming", 40L),
+        ("world hello structured streaming", 41L)),
+      CheckNewAnswer(
+        ((("hello", "hello"), "hello"), 40, 51, 11, 2),
+        ((("world", "world"), "world"), 40, 51, 11, 2),
+        ((("streaming", "streaming"), "streaming"), 40, 51, 11, 2),
+        ((("spark", "spark"), "spark"), 40, 50, 10, 1),
+        ((("structured", "structured"), "structured"), 41, 51, 10, 1)),
+      // placing new sessions "before" previous sessions
+      AddData(inputData, ("spark streaming", 25L)),
+      CheckNewAnswer(
+        ((("spark", "spark"), "spark"), 25, 35, 10, 1),
+        ((("streaming", "streaming"), "streaming"), 25, 35, 10, 1),
+        ((("hello", "hello"), "hello"), 40, 51, 11, 2),
+        ((("world", "world"), "world"), 40, 51, 11, 2),
+        ((("streaming", "streaming"), "streaming"), 40, 51, 11, 2),
+        ((("spark", "spark"), "spark"), 40, 50, 10, 1),
+        ((("structured", "structured"), "structured"), 41, 51, 10, 1)),
+      // concatenating multiple previous sessions into one
+      AddData(inputData, ("spark streaming", 30L)),
+      CheckNewAnswer(
+        ((("spark", "spark"), "spark"), 25, 50, 25, 3),
+        ((("streaming", "streaming"), "streaming"), 25, 51, 26, 4),
+        ((("hello", "hello"), "hello"), 40, 51, 11, 2),
+        ((("world", "world"), "world"), 40, 51, 11, 2),
+        ((("structured", "structured"), "structured"), 41, 51, 10, 1)),
+      // placing new sessions after previous sessions
+      AddData(inputData, ("hello apache spark", 60L)),
+      CheckNewAnswer(
+        ((("spark", "spark"), "spark"), 25, 50, 25, 3),
+        ((("streaming", "streaming"), "streaming"), 25, 51, 26, 4),
+        ((("hello", "hello"), "hello"), 40, 51, 11, 2),
+        ((("world", "world"), "world"), 40, 51, 11, 2),
+        ((("structured", "structured"), "structured"), 41, 51, 10, 1),
+        ((("hello", "hello"), "hello"), 60, 70, 10, 1),
+        ((("apache", "apache"), "apache"), 60, 70, 10, 1),
+        ((("spark", "spark"), "spark"), 60, 70, 10, 1)),
+      AddData(inputData, ("structured streaming", 90L)),
+      CheckNewAnswer(
+        ((("spark", "spark"), "spark"), 25, 50, 25, 3),
+        ((("streaming", "streaming"), "streaming"), 25, 51, 26, 4),
+        ((("hello", "hello"), "hello"), 40, 51, 11, 2),
+        ((("world", "world"), "world"), 40, 51, 11, 2),
+        ((("structured", "structured"), "structured"), 41, 51, 10, 1),
+        ((("hello", "hello"), "hello"), 60, 70, 10, 1),
+        ((("apache", "apache"), "apache"), 60, 70, 10, 1),
+        ((("spark", "spark"), "spark"), 60, 70, 10, 1),
+        ((("structured", "structured"), "structured"), 90, 100, 10, 1),
+        ((("streaming", "streaming"), "streaming"), 90, 100, 10, 1)))
+  }
+
+  // Logic is the same as `complete mode - session window`
+  // just with a more complex key
+  testWithAllOptions("complete mode - session window - multiple col key") {
+    val inputData = MemoryStream[(String, Long)]
+    val sessionUpdates = sessionWindowQueryMultiColKey(inputData)
+
+    testStream(sessionUpdates, OutputMode.Complete())(
+      AddData(
+        inputData,
+        ("hello world spark streaming", 40L),
+        ("world hello structured streaming", 41L)),
+      CheckNewAnswer(
+        (("hello", "hello"), "hello", 40, 51, 11, 2),
+        (("world", "world"), "world", 40, 51, 11, 2),
+        (("streaming", "streaming"), "streaming", 40, 51, 11, 2),
+        (("spark", "spark"), "spark", 40, 50, 10, 1),
+        (("structured", "structured"), "structured", 41, 51, 10, 1)),
+      // placing new sessions "before" previous sessions
+      AddData(inputData, ("spark streaming", 25L)),
+      CheckNewAnswer(
+        (("spark", "spark"), "spark", 25, 35, 10, 1),
+        (("streaming", "streaming"), "streaming", 25, 35, 10, 1),
+        (("hello", "hello"), "hello", 40, 51, 11, 2),
+        (("world", "world"), "world", 40, 51, 11, 2),
+        (("streaming", "streaming"), "streaming", 40, 51, 11, 2),
+        (("spark", "spark"), "spark", 40, 50, 10, 1),
+        (("structured", "structured"), "structured", 41, 51, 10, 1)),
+      // concatenating multiple previous sessions into one
+      AddData(inputData, ("spark streaming", 30L)),
+      CheckNewAnswer(
+        (("spark", "spark"), "spark", 25, 50, 25, 3),
+        (("streaming", "streaming"), "streaming", 25, 51, 26, 4),
+        (("hello", "hello"), "hello", 40, 51, 11, 2),
+        (("world", "world"), "world", 40, 51, 11, 2),
+        (("structured", "structured"), "structured", 41, 51, 10, 1)),
+      // placing new sessions after previous sessions
+      AddData(inputData, ("hello apache spark", 60L)),
+      CheckNewAnswer(
+        (("spark", "spark"), "spark", 25, 50, 25, 3),
+        (("streaming", "streaming"), "streaming", 25, 51, 26, 4),
+        (("hello", "hello"), "hello", 40, 51, 11, 2),
+        (("world", "world"), "world", 40, 51, 11, 2),
+        (("structured", "structured"), "structured", 41, 51, 10, 1),
+        (("hello", "hello"), "hello", 60, 70, 10, 1),
+        (("apache", "apache"), "apache", 60, 70, 10, 1),
+        (("spark", "spark"), "spark", 60, 70, 10, 1)),
+      AddData(inputData, ("structured streaming", 90L)),
+      CheckNewAnswer(
+        (("spark", "spark"), "spark", 25, 50, 25, 3),
+        (("streaming", "streaming"), "streaming", 25, 51, 26, 4),
+        (("hello", "hello"), "hello", 40, 51, 11, 2),
+        (("world", "world"), "world", 40, 51, 11, 2),
+        (("structured", "structured"), "structured", 41, 51, 10, 1),
+        (("hello", "hello"), "hello", 60, 70, 10, 1),
+        (("apache", "apache"), "apache", 60, 70, 10, 1),
+        (("spark", "spark"), "spark", 60, 70, 10, 1),
+        (("structured", "structured"), "structured", 90, 100, 10, 1),
+        (("streaming", "streaming"), "streaming", 90, 100, 10, 1)))
+  }
+
   testWithAllOptions("complete mode - session window - no key") {
     // complete mode doesn't honor watermark: even it is specified, watermark 
will be
     // always Unix timestamp 0
@@ -354,6 +475,107 @@ class StreamingSessionWindowSuite extends StreamTest
     )
   }
 
+  // Logic is the same as `SPARK-36465: dynamic gap duration`
+  // just with a more complex key
+  testWithAllOptions("dynamic gap duration - nested tuple key") {
+    val inputData = MemoryStream[(String, Long)]
+
+    val udf = spark.udf.register(
+      "gapDuration",
+      (s: ((String, String), String)) => {
+        if (s == (("hello", "hello"), "hello")) {
+          "1 second"
+        } else if (s == (("structured", "structured"), "structured")) {
+          // zero gap duration will be filtered out from aggregation
+          "0 second"
+        } else if (s == (("world", "world"), "world")) {
+          // negative gap duration will be filtered out from aggregation
+          "-10 seconds"
+        } else {
+          "10 seconds"
+        }
+      })
+
+    val sessionUpdates =
+      sessionWindowQueryNestedKey(inputData, session_window($"eventTime", 
udf($"sessionId")))
+
+    testStream(sessionUpdates, OutputMode.Append())(
+      AddData(
+        inputData,
+        ("hello world spark streaming", 40L),
+        ("world hello structured streaming", 41L)),
+      CheckNewAnswer(),
+      // placing new sessions "before" previous sessions
+      AddData(inputData, ("spark streaming", 25L)),
+      CheckNewAnswer(),
+      // late event which session's end 10 would be later than watermark 11: 
should be dropped
+      AddData(inputData, ("spark streaming", 0L)),
+      CheckNewAnswer(),
+      assertNumRowsDroppedByWatermark(2),
+      // concatenating multiple previous sessions into one
+      AddData(inputData, ("spark streaming", 30L)),
+      CheckNewAnswer(),
+      // placing new sessions after previous sessions
+      AddData(inputData, ("hello apache spark", 60L)),
+      CheckNewAnswer(),
+      AddData(inputData, ("structured streaming", 90L)),
+      CheckNewAnswer(
+        ((("spark", "spark"), "spark"), 25, 50, 25, 3),
+        ((("streaming", "streaming"), "streaming"), 25, 51, 26, 4),
+        ((("hello", "hello"), "hello"), 40, 42, 2, 2)))
+  }
+
+  // Logic is the same as `SPARK-36465: dynamic gap duration`
+  // just with a more complex key
+  testWithAllOptions("dynamic gap duration - multiple col key") {
+    val inputData = MemoryStream[(String, Long)]
+
+    val udf = spark.udf.register(
+      "gapDuration",
+      (s1: (String, String), s2: String) => {
+        if (s1 == ("hello", "hello") && s2 == "hello") {
+          "1 second"
+        } else if (s1 == ("structured", "structured") && s2 == "structured") {
+          // zero gap duration will be filtered out from aggregation
+          "0 second"
+        } else if (s1 == ("world", "world") && s2 == "world") {
+          // negative gap duration will be filtered out from aggregation
+          "-10 seconds"
+        } else {
+          "10 seconds"
+        }
+      })
+
+    val sessionUpdates = sessionWindowQueryMultiColKey(
+      inputData,
+      session_window($"eventTime", udf($"aggKeyDouble", $"aggKeySingle")))
+
+    testStream(sessionUpdates, OutputMode.Append())(
+      AddData(
+        inputData,
+        ("hello world spark streaming", 40L),
+        ("world hello structured streaming", 41L)),
+      CheckNewAnswer(),
+      // placing new sessions "before" previous sessions
+      AddData(inputData, ("spark streaming", 25L)),
+      CheckNewAnswer(),
+      // late event which session's end 10 would be later than watermark 11: 
should be dropped
+      AddData(inputData, ("spark streaming", 0L)),
+      CheckNewAnswer(),
+      assertNumRowsDroppedByWatermark(2),
+      // concatenating multiple previous sessions into one
+      AddData(inputData, ("spark streaming", 30L)),
+      CheckNewAnswer(),
+      // placing new sessions after previous sessions
+      AddData(inputData, ("hello apache spark", 60L)),
+      CheckNewAnswer(),
+      AddData(inputData, ("structured streaming", 90L)),
+      CheckNewAnswer(
+        (("spark", "spark"), "spark", 25, 50, 25, 3),
+        (("streaming", "streaming"), "streaming", 25, 51, 26, 4),
+        (("hello", "hello"), "hello", 40, 42, 2, 2)))
+  }
+
   testWithAllOptions("append mode - session window - no key") {
     val inputData = MemoryStream[Int]
     val windowedAggregation = sessionWindowQueryOnGlobalKey(inputData)
@@ -402,6 +624,76 @@ class StreamingSessionWindowSuite extends StreamTest
     }
   }
 
+  // Test that session window works with mean, median, std dev, variance, UDAFs
+  // and first, last on nested tuple keys
+  testWithAllOptions("session window - with more aggregation functions and 
UDAFs") {
+    // create a trivial summation UDAF for test
+    // input type is a single Long
+    object MySum extends Aggregator[Long, Long, Long] {
+
+      def zero: Long = 0L
+
+      def reduce(buffer: Long, data: Long): Long = {
+        buffer + data
+      }
+
+      def merge(b1: Long, b2: Long): Long = {
+        b1 + b2
+      }
+
+      def finish(reduction: Long): Long = reduction
+
+      def bufferEncoder: Encoder[Long] = Encoders.scalaLong
+
+      def outputEncoder: Encoder[Long] = Encoders.scalaLong
+    }
+    val mySum = spark.udf.register("mySum", udaf(MySum))
+
+    val inputData = MemoryStream[(String, Long)]
+    val sessionWithAgg = {
+      // Split the lines into words, treat words as sessionId of events
+      val events = inputData.toDF()
+        .select($"_1".as("value"), $"_2".as("timestamp"))
+        .withColumn("eventTime", $"timestamp".cast("timestamp"))
+        .withColumn("single", $"timestamp")
+        .withColumn("double", struct($"single", $"single"))
+        .withColumn("triple", struct($"double", $"single"))
+        .withWatermark("eventTime", "30 seconds")
+        .selectExpr("explode(split(value, ' ')) AS sessionId",
+          "eventTime", "single", "triple")
+
+      val sessionWindow = session_window($"eventTime", "10 seconds")
+      events
+        .groupBy(sessionWindow.as("session"), $"sessionId")
+        .agg(
+          mean($"single").as("meanTime"),
+          median($"single").as("medianTime"),
+          stddev($"single").as("stdDevTime"),
+          variance($"single").as("varTime"),
+          first($"triple").as("firstTimeTriple"),
+          last($"triple").as("lastTimeTriple"),
+          mySum($"single").as("mySumSingle")
+        )
+        .selectExpr(
+          "sessionId", "CAST(session.start AS LONG)",
+          "CAST(session.end AS LONG)",
+          // "firstTimeTriple", "lastTimeTriple",  // Non deterministic
+          "CAST(meanTime AS LONG)", "CAST(medianTime AS LONG)",
+          "CAST(stdDevTime AS LONG)", "CAST(varTime AS LONG)",
+          "mySumSingle")
+    }
+
+    testStream(sessionWithAgg, OutputMode.Append())(
+      AddData(inputData, ("a", 41L)),
+      AddData(inputData, ("a", 42L)),
+      AddData(inputData, ("a", 40L)),
+      CheckAnswer(),
+      AddData(inputData, ("b", 100L)), // Move the watermark past the end of 
the session.
+      AddData(inputData, ("b", 101L)), // Trigger the session production.
+      CheckAnswer(("a", 40, 52, 41, 41, 1, 1, 123))
+    )
+  }
+
   private def assertNumRowsDroppedByWatermark(
       numRowsDroppedByWatermark: Long): AssertOnQuery = AssertOnQuery { q =>
     q.processAllAvailable()
@@ -445,4 +737,64 @@ class StreamingSessionWindowSuite extends StreamTest
       .select($"session".getField("start").cast("long").as[Long],
         $"session".getField("end").cast("long").as[Long], $"count".as[Long], 
$"sum".as[Long])
   }
+
+  // Manipulate the input to create a nested tuple out of words and do 
aggregation on the tuples.
+  // Split the lines into words, convert each word into a nested tuple,
+  // e.g. hello -> ((hello, hello), hello), treat each tuple as the sessionId 
of a event,
+  // which get aggregated in the second clause.
+  private def sessionWindowQueryNestedKey(
+      input: MemoryStream[(String, Long)],
+      sessionWindow: Column = session_window($"eventTime", "10 seconds")): 
DataFrame = {
+    val events = input
+      .toDF()
+      .select($"_1".as("value"), $"_2".as("timestamp"))
+      .withColumn("eventTime", $"timestamp".cast("timestamp"))
+      .withColumn("sessionIdSingle", split($"value", " "))
+      .withColumn("sessionIdDouble", arrays_zip($"sessionIdSingle", 
$"sessionIdSingle"))
+      .withColumn("sessionIdTriple", arrays_zip($"sessionIdDouble", 
$"sessionIdSingle"))
+      .withWatermark("eventTime", "30 seconds")
+      .selectExpr("explode(sessionIdTriple) AS sessionId", "eventTime")
+
+    events
+      .groupBy(sessionWindow.as("session"), $"sessionId")
+      .agg(count("*").as("numEvents"))
+      .selectExpr(
+        "sessionId",
+        "CAST(session.start AS LONG)",
+        "CAST(session.end AS LONG)",
+        "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS 
durationMs",
+        "numEvents")
+  }
+
+  // Manipulate the input, for each word, create two columns and do 
aggregation on both cols.
+  // Split the lines into words, convert each word into two columns,
+  // e.g. hello -> col1: (hello, hello), col2: hello. Aggregate on both 
columns.
+  private def sessionWindowQueryMultiColKey(
+      input: MemoryStream[(String, Long)],
+      sessionWindow: Column = session_window($"eventTime", "10 seconds")): 
DataFrame = {
+    val events = input
+      .toDF()
+      .select($"_1".as("value"), $"_2".as("timestamp"))
+      .withColumn("eventTime", $"timestamp".cast("timestamp"))
+      .withColumn("sessionIdSingle", split($"value", " "))
+      .withColumn("sessionIdDouble", arrays_zip($"sessionIdSingle", 
$"sessionIdSingle"))
+      .withColumn("sessionIdTriple", arrays_zip($"sessionIdDouble", 
$"sessionIdSingle"))
+      .withColumn("sessionIdTriple", explode($"sessionIdTriple"))
+      .withWatermark("eventTime", "30 seconds")
+      .selectExpr(
+        "sessionIdTriple.sessionIdDouble AS aggKeyDouble",
+        "sessionIdTriple.sessionIdSingle AS aggKeySingle",
+        "eventTime")
+
+    events
+      .groupBy(sessionWindow.as("session"), $"aggKeyDouble", $"aggKeySingle")
+      .agg(count("*").as("numEvents"))
+      .selectExpr(
+        "aggKeyDouble",
+        "aggKeySingle",
+        "CAST(session.start AS LONG)",
+        "CAST(session.end AS LONG)",
+        "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS 
durationMs",
+        "numEvents")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to