This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 3ee9a0d [SPARK-35815][SQL] Allow delayThreshold for watermark to be represented as ANSI interval literals 3ee9a0d is described below commit 3ee9a0db3a3eb8e88bbab28207c91bd4637b313a Author: Kousuke Saruta <saru...@oss.nttdata.com> AuthorDate: Thu Jul 22 17:36:22 2021 +0300 [SPARK-35815][SQL] Allow delayThreshold for watermark to be represented as ANSI interval literals ### What changes were proposed in this pull request? This PR extends the way to represent `delayThreshold` with ANSI interval literals for watermark. ### Why are the changes needed? A `delayThreshold` is semantically an interval value so it's should be represented as ANSI interval literals as well as the conventional `1 second` form. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. Closes #33456 from sarutak/delayThreshold-interval. Authored-by: Kousuke Saruta <saru...@oss.nttdata.com> Signed-off-by: Max Gekk <max.g...@gmail.com> (cherry picked from commit 07fa38e2c1082c2b69b3bf9489cee4dfe4db2c26) Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/errors/QueryCompilationErrors.scala | 2 +- .../main/scala/org/apache/spark/sql/Dataset.scala | 23 ++++++--- .../sql/streaming/EventTimeWatermarkSuite.scala | 54 ++++++++++++++++++++++ 3 files changed, 71 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 7a33d52a..f576036 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2265,7 +2265,7 @@ private[spark] object QueryCompilationErrors { s"""Cannot resolve column name "$colName" among (${fieldsStr})${extraMsg}""") } - def cannotParseTimeDelayError(delayThreshold: String, e: IllegalArgumentException): Throwable = { + def cannotParseTimeDelayError(delayThreshold: String, e: Throwable): Throwable = { new AnalysisException(s"Unable to parse time delay '$delayThreshold'", cause = Some(e)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 0fd10c1..3abc060 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream} +import java.util.Locale import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashSet} @@ -42,7 +43,7 @@ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} import org.apache.spark.sql.catalyst.optimizer.CombineUnions -import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils} +import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException, ParserUtils} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection} @@ -63,7 +64,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.array.ByteArrayMethods -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.util.Utils private[sql] object Dataset { @@ -739,13 +740,21 @@ class Dataset[T] private[sql]( // We only accept an existing column name, not a derived column here as a watermark that is // defined on a derived column cannot referenced elsewhere in the plan. def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan { - val parsedDelay = - try { + val parsedDelay = try { + if (delayThreshold.toLowerCase(Locale.ROOT).trim.startsWith("interval")) { + CatalystSqlParser.parseExpression(delayThreshold) match { + case Literal(months: Int, _: YearMonthIntervalType) => + new CalendarInterval(months, 0, 0) + case Literal(micros: Long, _: DayTimeIntervalType) => + new CalendarInterval(0, 0, micros) + } + } else { IntervalUtils.stringToInterval(UTF8String.fromString(delayThreshold)) - } catch { - case e: IllegalArgumentException => - throw QueryCompilationErrors.cannotParseTimeDelayError(delayThreshold, e) } + } catch { + case NonFatal(e) => + throw QueryCompilationErrors.cannotParseTimeDelayError(delayThreshold, e) + } require(!IntervalUtils.isNegative(parsedDelay), s"delay threshold ($delayThreshold) should not be negative.") EliminateEventTimeWatermark( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 67ab72a..2724153 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -31,6 +31,7 @@ import org.scalatest.matchers.should.Matchers._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Dataset} import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark +import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.MemorySink @@ -765,6 +766,59 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } } + test("SPARK-35815: Support ANSI intervals for delay threshold") { + val DAYS_PER_MONTH = 31 + Seq( + // Conventional form and some variants + (Seq("3 days", "Interval 3 day", "inTerval '3' day"), 3 * MILLIS_PER_DAY), + (Seq(" 5 hours", "INTERVAL 5 hour", "interval '5' hour"), 5 * MILLIS_PER_HOUR), + (Seq("\t8 minutes", "interval 8 minute", "interval '8' minute"), 8 * MILLIS_PER_MINUTE), + (Seq("10 seconds", "interval 10 second", "interval '10' second"), 10 * MILLIS_PER_SECOND), + (Seq("1 years", "interval 1 year", "interval '1' year"), + MONTHS_PER_YEAR * DAYS_PER_MONTH * MILLIS_PER_DAY), + (Seq("1 months", "interval 1 month", "interval '1' month"), DAYS_PER_MONTH * MILLIS_PER_DAY), + (Seq( + "1 day 2 hours 3 minutes 4 seconds", + " interval 1 day 2 hours 3 minutes 4 seconds", + "\tinterval '1' day '2' hours '3' minutes '4' seconds", + "interval '1 2:3:4' day to second"), + MILLIS_PER_DAY + 2 * MILLIS_PER_HOUR + 3 * MILLIS_PER_MINUTE + 4 * MILLIS_PER_SECOND), + (Seq( + " 1 year 2 months", + "interval 1 year 2 month", + "interval '1' year '2' month", + "\tinterval '1-2' year to month"), + (MONTHS_PER_YEAR * DAYS_PER_MONTH + 2 * DAYS_PER_MONTH) * MILLIS_PER_DAY) + ).foreach { case (delayThresholdVariants, expectedMs) => + delayThresholdVariants.foreach { case delayThreshold => + val df = MemoryStream[Int].toDF + .withColumn("eventTime", timestamp_seconds($"value")) + .withWatermark("eventTime", delayThreshold) + val eventTimeAttr = df.queryExecution.analyzed.output.find(a => a.name == "eventTime") + assert(eventTimeAttr.isDefined) + val metadata = eventTimeAttr.get.metadata + assert(metadata.contains(EventTimeWatermark.delayKey)) + assert(metadata.getLong(EventTimeWatermark.delayKey) === expectedMs) + } + } + + // Invalid interval patterns + Seq( + "1 foo", + "interva 2 day", + "intrval '3' day", + "interval 4 foo", + "interval '5' foo", + "interval '1 2:3:4' day to hour", + "interval '1 2' year to month").foreach { delayThreshold => + intercept[AnalysisException] { + val df = MemoryStream[Int].toDF + .withColumn("eventTime", timestamp_seconds($"value")) + .withWatermark("eventTime", delayThreshold) + } + } + } + private def dfWithMultipleWatermarks( input1: MemoryStream[Int], input2: MemoryStream[Int]): Dataset[_] = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org