This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5fe963f8560e [SPARK-46380][SQL] Replace current time/date prior to
evaluating inline table expressions
5fe963f8560e is described below
commit 5fe963f8560ef05925d127e82ab7ef28d6a1d7bc
Author: Aleksandar Tomic <[email protected]>
AuthorDate: Thu Dec 21 15:58:15 2023 +0800
[SPARK-46380][SQL] Replace current time/date prior to evaluating inline
table expressions
### What changes were proposed in this pull request?
With this PR proposal is to do inline table resolution in two phases:
1) If there are no expressions that depend on current context (e.g.
expressions that depend on CURRENT_DATABASE, CURRENT_USER, CURRENT_TIME etc.)
they will be evaluated as part of ResolveInlineTable rule.
2) Expressions that do depend on CURRENT_* evaluation will be kept as
expressions and they evaluation will be delayed to post analysis phase.
### Why are the changes needed?
This PR aims to solve two problems with inline tables.
Example1:
```sql
SELECT COUNT(DISTINCT ct) FROM VALUES
(CURRENT_TIMESTAMP()),
(CURRENT_TIMESTAMP()),
(CURRENT_TIMESTAMP()) as data(ct)
```
Prior to this change this example would return 3 (i.e. all
CURRENT_TIMESTAMP expressions would return different value since they would be
evaluated individually as part of inline table evaluation). After this change
result is 1.
Example 2:
```sql
CREATE VIEW V as (SELECT * FROM VALUES(CURRENT_TIMESTAMP())
```
In this example VIEW would be saved with literal evaluated during VIEW
creation. After this change CURRENT_TIMESTAMP() will eval during VIEW execution.
### Does this PR introduce _any_ user-facing change?
See section above.
### How was this patch tested?
New test that validates this behaviour is introduced.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44316 from dbatomic/inline_tables_curr_time_fix.
Lead-authored-by: Aleksandar Tomic <[email protected]>
Co-authored-by: Aleksandar Tomic
<[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalyst/analysis/ResolveInlineTables.scala | 68 ++++++++++++----------
.../spark/sql/catalyst/analysis/unresolved.scala | 15 +++++
.../spark/sql/catalyst/optimizer/Optimizer.scala | 4 +-
.../sql/catalyst/optimizer/finishAnalysis.scala | 34 ++++++++++-
.../sql/catalyst/rules/RuleIdCollection.scala | 1 +
.../spark/sql/catalyst/trees/TreePatterns.scala | 1 +
.../analysis/ResolveInlineTablesSuite.scala | 31 ++++++++--
.../analyzer-results/inline-table.sql.out | 16 ++++-
.../postgreSQL/create_view.sql.out | 2 +-
.../resources/sql-tests/inputs/inline-table.sql | 6 ++
.../sql-tests/results/inline-table.sql.out | 16 +++++
.../apache/spark/sql/execution/SQLViewSuite.scala | 14 +++++
12 files changed, 165 insertions(+), 43 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
index 760ea466b857..73600f5c7064 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
@@ -17,28 +17,29 @@
package org.apache.spark.sql.catalyst.analysis
-import scala.util.control.NonFatal
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper,
Expression}
+import org.apache.spark.sql.catalyst.optimizer.EvalInlineTables
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
+import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.TypeUtils.{toSQLExpr, toSQLId}
import org.apache.spark.sql.types.{StructField, StructType}
/**
- * An analyzer rule that replaces [[UnresolvedInlineTable]] with
[[LocalRelation]].
+ * An analyzer rule that replaces [[UnresolvedInlineTable]] with
[[ResolvedInlineTable]].
*/
object ResolveInlineTables extends Rule[LogicalPlan]
with CastSupport with AliasHelper with EvalHelper {
- override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsWithPruning(
- AlwaysProcess.fn, ruleId) {
- case table: UnresolvedInlineTable if table.expressionsResolved =>
- validateInputDimension(table)
- validateInputEvaluable(table)
- convert(table)
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.resolveOperatorsWithPruning(AlwaysProcess.fn, ruleId) {
+ case table: UnresolvedInlineTable if table.expressionsResolved =>
+ validateInputDimension(table)
+ validateInputEvaluable(table)
+ val resolvedTable = findCommonTypesAndCast(table)
+ earlyEvalIfPossible(resolvedTable)
+ }
}
/**
@@ -74,7 +75,10 @@ object ResolveInlineTables extends Rule[LogicalPlan]
table.rows.foreach { row =>
row.foreach { e =>
// Note that nondeterministic expressions are not supported since they
are not foldable.
- if (!e.resolved || !trimAliases(prepareForEval(e)).foldable) {
+ // Only exception are CURRENT_LIKE expressions, which are replaced by
a literal
+ // In later stages.
+ if ((!e.resolved && !e.containsPattern(CURRENT_LIKE))
+ || !trimAliases(prepareForEval(e)).foldable) {
e.failAnalysis(
errorClass =
"INVALID_INLINE_TABLE.CANNOT_EVALUATE_EXPRESSION_IN_INLINE_TABLE",
messageParameters = Map("expr" -> toSQLExpr(e)))
@@ -84,14 +88,12 @@ object ResolveInlineTables extends Rule[LogicalPlan]
}
/**
- * Convert a valid (with right shape and foldable inputs)
[[UnresolvedInlineTable]]
- * into a [[LocalRelation]].
- *
* This function attempts to coerce inputs into consistent types.
*
* This is package visible for unit testing.
*/
- private[analysis] def convert(table: UnresolvedInlineTable): LocalRelation =
{
+ private[analysis] def findCommonTypesAndCast(table: UnresolvedInlineTable):
+ ResolvedInlineTable = {
// For each column, traverse all the values and find a common data type
and nullability.
val fields = table.rows.transpose.zip(table.names).map { case (column,
name) =>
val inputTypes = column.map(_.dataType)
@@ -105,26 +107,30 @@ object ResolveInlineTables extends Rule[LogicalPlan]
val attributes = DataTypeUtils.toAttributes(StructType(fields))
assert(fields.size == table.names.size)
- val newRows: Seq[InternalRow] = table.rows.map { row =>
- InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) =>
- val targetType = fields(ci).dataType
- try {
+ val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>
+ row.zipWithIndex.map {
+ case (e, ci) =>
+ val targetType = fields(ci).dataType
val castedExpr = if (DataTypeUtils.sameType(e.dataType, targetType))
{
e
} else {
cast(e, targetType)
}
- prepareForEval(castedExpr).eval()
- } catch {
- case NonFatal(ex) =>
- table.failAnalysis(
- errorClass =
"INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION",
- messageParameters = Map("sqlExpr" -> toSQLExpr(e)),
- cause = ex)
- }
- })
+ castedExpr
+ }
}
- LocalRelation(attributes, newRows)
+ ResolvedInlineTable(castedRows, attributes)
+ }
+
+ /**
+ * This function attempts to early evaluate rows in inline table.
+ * If evaluation doesn't rely on non-deterministic expressions (e.g.
current_like)
+ * expressions will be evaluated and inlined as [[LocalRelation]]
+ * This is package visible for unit testing.
+ */
+ private[analysis] def earlyEvalIfPossible(table: ResolvedInlineTable):
LogicalPlan = {
+ val earlyEvalPossible =
table.rows.flatten.forall(!_.containsPattern(CURRENT_LIKE))
+ if (earlyEvalPossible) EvalInlineTables(table) else table
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index e1dec5955a7f..b32ff671b2b7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -131,6 +131,21 @@ case class UnresolvedInlineTable(
lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved))
}
+/**
+ * An resolved inline table that holds all the expressions that were checked
for
+ * the right shape and common data types.
+ * This is a preparation step for
[[org.apache.spark.sql.catalyst.optimizer.EvalInlineTables]] which
+ * will produce a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]
+ * for this inline table.
+ *
+ * @param output list of column attributes
+ * @param rows expressions for the data rows
+ */
+case class ResolvedInlineTable(rows: Seq[Seq[Expression]], output:
Seq[Attribute])
+ extends LeafNode {
+ final override val nodePatterns: Seq[TreePattern] = Seq(INLINE_TABLE_EVAL)
+}
+
/**
* A table-valued function, e.g.
* {{{
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index a4b25cbd1d2e..5a19c5e3c241 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -293,7 +293,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
ComputeCurrentTime,
ReplaceCurrentLike(catalogManager),
SpecialDatetimeValues,
- RewriteAsOfJoin)
+ RewriteAsOfJoin,
+ EvalInlineTables
+ )
override def apply(plan: LogicalPlan): LogicalPlan = {
rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
index 18c85999312d..92ac7599a8ff 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala
@@ -19,7 +19,11 @@ package org.apache.spark.sql.catalyst.optimizer
import java.time.{Instant, LocalDateTime, ZoneId}
-import org.apache.spark.sql.catalyst.CurrentUserContext
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.catalyst.{CurrentUserContext, InternalRow}
+import org.apache.spark.sql.catalyst.analysis.{CastSupport,
ResolvedInlineTable}
+import
org.apache.spark.sql.catalyst.analysis.ResolveInlineTables.prepareForEval
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -27,6 +31,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.trees.TreePatternBits
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate,
convertSpecialTimestamp, convertSpecialTimestampNTZ, instantToMicros,
localDateTimeToMicros}
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLExpr
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.types._
@@ -70,6 +75,33 @@ object RewriteNonCorrelatedExists extends Rule[LogicalPlan] {
}
}
+/**
+ * Computes expressions in inline tables. This rule is supposed to be called
at the very end
+ * of the analysis phase, given that all the expressions need to be fully
resolved/replaced
+ * at this point.
+ */
+object EvalInlineTables extends Rule[LogicalPlan] with CastSupport {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(INLINE_TABLE_EVAL))
{
+ case table: ResolvedInlineTable =>
+ val newRows: Seq[InternalRow] =
+ table.rows.map { row => InternalRow.fromSeq(row.map { e =>
+ try {
+ prepareForEval(e).eval()
+ } catch {
+ case NonFatal(ex) =>
+ table.failAnalysis(
+ errorClass =
"INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION",
+ messageParameters = Map("sqlExpr" -> toSQLExpr(e)),
+ cause = ex)
+ }})
+ }
+
+ LocalRelation(table.output, newRows)
+ }
+ }
+}
+
/**
* Computes the current date and time to make sure we return the same result
in a single query.
*/
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index a6d03692646c..8eeea74b5376 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -168,6 +168,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.optimizer.SimplifyConditionals" ::
"org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps" ::
"org.apache.spark.sql.catalyst.optimizer.TransposeWindow" ::
+ "org.apache.spark.sql.catalyst.optimizer.EvalInlineTables" ::
"org.apache.spark.sql.catalyst.optimizer.UnwrapCastInBinaryComparison"
:: Nil
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index fc869bce2772..daa4ea0c8616 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -54,6 +54,7 @@ object TreePattern extends Enumeration {
val IF: Value = Value
val IN: Value = Value
val IN_SUBQUERY: Value = Value
+ val INLINE_TABLE_EVAL: Value = Value
val INSET: Value = Value
val INTERSECT: Value = Value
val INVOKE: Value = Value
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
index 2e6c6e4eaf4c..758b6b73e4eb 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.catalyst.analysis
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal, Rand}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Cast,
CurrentTimestamp, Literal, Rand}
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
+import org.apache.spark.sql.catalyst.optimizer.{ComputeCurrentTime,
EvalInlineTables}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.types.{LongType, NullType, TimestampType}
@@ -83,9 +84,10 @@ class ResolveInlineTablesSuite extends AnalysisTest with
BeforeAndAfter {
assert(ResolveInlineTables(table) == table)
}
- test("convert") {
+ test("cast and execute") {
val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)),
Seq(lit(2L))))
- val converted = ResolveInlineTables.convert(table)
+ val resolved = ResolveInlineTables.findCommonTypesAndCast(table)
+ val converted =
ResolveInlineTables.earlyEvalIfPossible(resolved).asInstanceOf[LocalRelation]
assert(converted.output.map(_.dataType) == Seq(LongType))
assert(converted.data.size == 2)
@@ -93,11 +95,28 @@ class ResolveInlineTablesSuite extends AnalysisTest with
BeforeAndAfter {
assert(converted.data(1).getLong(0) == 2L)
}
+ test("cast and execute CURRENT_LIKE expressions") {
+ val table = UnresolvedInlineTable(Seq("c1"), Seq(
+ Seq(CurrentTimestamp()), Seq(CurrentTimestamp())))
+ val casted = ResolveInlineTables.findCommonTypesAndCast(table)
+ val earlyEval = ResolveInlineTables.earlyEvalIfPossible(casted)
+ // Early eval should keep it in expression form.
+ assert(earlyEval.isInstanceOf[ResolvedInlineTable])
+
+ EvalInlineTables(ComputeCurrentTime(earlyEval)) match {
+ case LocalRelation(output, data, _) =>
+ assert(output.map(_.dataType) == Seq(TimestampType))
+ assert(data.size == 2)
+ // Make sure that both CURRENT_TIMESTAMP expressions are evaluated to
the same value.
+ assert(data(0).getLong(0) == data(1).getLong(0))
+ }
+ }
+
test("convert TimeZoneAwareExpression") {
val table = UnresolvedInlineTable(Seq("c1"),
Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType))))
val withTimeZone = ResolveTimeZone.apply(table)
- val LocalRelation(output, data, _) =
ResolveInlineTables.apply(withTimeZone)
+ val LocalRelation(output, data, _) =
EvalInlineTables(ResolveInlineTables.apply(withTimeZone))
val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType)
.withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long]
assert(output.map(_.dataType) == Seq(TimestampType))
@@ -107,11 +126,11 @@ class ResolveInlineTablesSuite extends AnalysisTest with
BeforeAndAfter {
test("nullability inference in convert") {
val table1 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)),
Seq(lit(2L))))
- val converted1 = ResolveInlineTables.convert(table1)
+ val converted1 = ResolveInlineTables.findCommonTypesAndCast(table1)
assert(!converted1.schema.fields(0).nullable)
val table2 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)),
Seq(Literal(null, NullType))))
- val converted2 = ResolveInlineTables.convert(table2)
+ val converted2 = ResolveInlineTables.findCommonTypesAndCast(table2)
assert(converted2.schema.fields(0).nullable)
}
}
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
index 0d79168651fd..988df7de1a3c 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/inline-table.sql.out
@@ -73,9 +73,7 @@ Project [a#x, b#x]
-- !query
select a from values ("one", current_timestamp) as data(a, b)
-- !query analysis
-Project [a#x]
-+- SubqueryAlias data
- +- LocalRelation [a#x, b#x]
+[Analyzer test output redacted due to nondeterminism]
-- !query
@@ -246,3 +244,15 @@ select * from values (10 + try_divide(5, 0))
-- !query analysis
Project [col1#x]
+- LocalRelation [col1#x]
+
+
+-- !query
+select count(distinct ct) from values now(), now(), now() as data(ct)
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+select count(distinct ct) from values current_timestamp(), current_timestamp()
as data(ct)
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out
index 1adc3ae0fa65..0a74ec87eb83 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/create_view.sql.out
@@ -1674,7 +1674,7 @@ select * from tt7a left join tt8a using (x), tt8a tt8ax,
false, false, Persisted
:- Project [a#x, b#x, c#x, d#x, e#x]
: +- SubqueryAlias v
: +- Project [col1#x AS a#x, col2#x AS b#x, col3#x AS c#x, col4#x AS
d#x, col5#x AS e#x]
- : +- LocalRelation [col1#x, col2#x, col3#x, col4#x, col5#x]
+ : +- ResolvedInlineTable [[now(), 2, 3, now(), 5]], [col1#x,
col2#x, col3#x, col4#x, col5#x]
+- Project [cast(x#x as timestamp) AS x#x, y#x, z#x, x#x, z#x]
+- Project [x#x, y#x, z#x, x#x, z#x]
+- Join Inner
diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
index 6867248f5765..8f65dc77c960 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql
@@ -60,3 +60,9 @@ select * from values (timestamp('1991-12-06 00:00:00.0'),
array(timestamp('1991-
select * from values (try_add(5, 0));
select * from values (try_divide(5, 0));
select * from values (10 + try_divide(5, 0));
+
+-- now() should be kept as tempResolved inline expression.
+select count(distinct ct) from values now(), now(), now() as data(ct);
+
+-- current_timestamp() should be kept as tempResolved inline expression.
+select count(distinct ct) from values current_timestamp(), current_timestamp()
as data(ct);
diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
index f34b2fbd8724..4dcdf8ac3e98 100644
--- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
@@ -271,3 +271,19 @@ select * from values (10 + try_divide(5, 0))
struct<col1:double>
-- !query output
NULL
+
+
+-- !query
+select count(distinct ct) from values now(), now(), now() as data(ct)
+-- !query schema
+struct<count(DISTINCT ct):bigint>
+-- !query output
+1
+
+
+-- !query
+select count(distinct ct) from values current_timestamp(), current_timestamp()
as data(ct)
+-- !query schema
+struct<count(DISTINCT ct):bigint>
+-- !query output
+1
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index a7cab381c7f6..bca16579acff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -1306,4 +1306,18 @@ abstract class SQLViewSuite extends QueryTest with
SQLTestUtils {
}
}
}
+
+ test("Inline table with current time expression") {
+ withView("v1") {
+ sql("CREATE VIEW v1 (t1, t2) AS SELECT * FROM VALUES (now(), now())")
+ val r1 = sql("select t1, t2 from v1").collect()(0)
+ val ts1 = (r1.getTimestamp(0), r1.getTimestamp(1))
+ assert(ts1._1 == ts1._2)
+ Thread.sleep(1)
+ val r2 = sql("select t1, t2 from v1").collect()(0)
+ val ts2 = (r2.getTimestamp(0), r2.getTimestamp(1))
+ assert(ts2._1 == ts2._2)
+ assert(ts1._1.getTime < ts2._1.getTime)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]