This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e9ca660 [SPARK-27340][SS][TESTS][FOLLOW-UP] Rephrase API comments and simplify tests e9ca660 is described below commit e9ca66096dc415e5046dba4f3f1738b7a96e39be Author: Yuanjian Li <xyliyuanj...@gmail.com> AuthorDate: Thu Apr 30 06:24:00 2020 +0000 [SPARK-27340][SS][TESTS][FOLLOW-UP] Rephrase API comments and simplify tests ### What changes were proposed in this pull request? - Rephrase the API doc for `Column.as` - Simplify the UTs ### Why are the changes needed? Address comments in https://github.com/apache/spark/pull/28326 ### Does this PR introduce any user-facing change? No ### How was this patch tested? New UT added. Closes #28390 from xuanyuanking/SPARK-27340-follow. Authored-by: Yuanjian Li <xyliyuanj...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 7195a18bf24d9506d2f8d9d4d93ff679b3d21b65) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- docs/sql-migration-guide.md | 2 ++ .../main/scala/org/apache/spark/sql/Column.scala | 12 ++++++++++ .../sql/streaming/EventTimeWatermarkSuite.scala | 16 +++++++++++++ .../spark/sql/streaming/StreamingJoinSuite.scala | 26 ---------------------- 4 files changed, 30 insertions(+), 26 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index e68198a..774fb2c 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -30,6 +30,8 @@ license: | - In Spark 2.4 and below, `Dataset.groupByKey` results to a grouped dataset with key attribute is wrongly named as "value", if the key is non-struct type, for example, int, string, array, etc. This is counterintuitive and makes the schema of aggregation queries unexpected. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behavior is preserved under a newly added configuration `spark.sql.legacy [...] + - In Spark 3.0, the column metadata will always be propagated in the API `Column.name` and `Column.as`. In Spark version 2.4 and earlier, the metadata of `NamedExpression` is set as the `explicitMetadata` for the new column at the time the API is called, it won't change even if the underlying `NamedExpression` changes metadata. To restore the behavior before Spark 2.4, you can use the API `as(alias: String, metadata: Metadata)` with explicit metadata. + ### DDL Statements - In Spark 3.0, `CREATE TABLE` without a specific provider uses the value of `spark.sql.sources.default` as its provider. In Spark version 2.4 and below, it was Hive. To restore the behavior before Spark 3.0, you can set `spark.sql.legacy.createHiveTableByDefault.enabled` to `true`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 6913d4e..2144472 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -964,6 +964,10 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".as("colB")) * }}} * + * If the current column has metadata associated with it, this metadata will be propagated + * to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)` + * with explicit metadata. + * * @group expr_ops * @since 1.3.0 */ @@ -1000,6 +1004,10 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".as('colB)) * }}} * + * If the current column has metadata associated with it, this metadata will be propagated + * to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)` + * with explicit metadata. + * * @group expr_ops * @since 1.3.0 */ @@ -1026,6 +1034,10 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".name("colB")) * }}} * + * If the current column has metadata associated with it, this metadata will be propagated + * to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)` + * with explicit metadata. + * * @group expr_ops * @since 2.0.0 */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 2313177..6486e1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -602,6 +602,22 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // Check the eventTime metadata is kept in the top level alias. assert(aliasWindow.logicalPlan.output.exists( _.metadata.contains(EventTimeWatermark.delayKey))) + + val windowedAggregation = aliasWindow + .groupBy('aliasWindow) + .agg(count("*") as 'count) + .select($"aliasWindow".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(windowedAggregation)( + AddData(inputData, 10, 11, 12, 13, 14, 15), + CheckNewAnswer(), + AddData(inputData, 25), // Advance watermark to 15 seconds + CheckNewAnswer((10, 5)), + assertNumStateRows(2), + AddData(inputData, 10), // Should not emit anything as data less than watermark + CheckNewAnswer(), + assertNumStateRows(2) + ) } test("test no-data flag") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 33f899b..3f218c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -991,30 +991,4 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with ) } } - - test("SPARK-27340 Windowed left out join with Alias on TimeWindow") { - val (leftInput, df1) = setupStream("left", 2) - val (rightInput, df2) = setupStream("right", 3) - val left = df1.select('key, window('leftTime, "10 second") as 'leftWindow, 'leftValue) - val right = df2.select('key, window('rightTime, "10 second") as 'rightWindow, 'rightValue) - val joined = left.join( - right, - left("key") === right("key") && left("leftWindow") === right("rightWindow"), - "left_outer") - .select(left("key"), $"leftWindow.end".cast("long"), 'leftValue, 'rightValue) - - testStream(joined)( - // Test inner part of the join. - MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7), - CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)), - - MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls - CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)), - assertNumStateRows(total = 2, updated = 12), - - AddData(leftInput, 22), - CheckNewAnswer(Row(22, 30, 44, 66)), - assertNumStateRows(total = 3, updated = 1) - ) - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org