This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new f30f50a [SPARK-30844][SQL] Static partition should also follow StoreAssignmentPolicy when insert into table f30f50a is described below commit f30f50a76f4b9fb5e652620563fb9055c5f30521 Author: yi.wu <yi...@databricks.com> AuthorDate: Sun Feb 23 17:46:19 2020 +0900 [SPARK-30844][SQL] Static partition should also follow StoreAssignmentPolicy when insert into table ### What changes were proposed in this pull request? Make static partition also follows `StoreAssignmentPolicy` when insert into table: if `StoreAssignmentPolicy=LEGACY`, using `Cast`; if `StoreAssignmentPolicy=ANSI | STRIC`, using `AnsiCast`; E.g., for the table `t` created by: ``` create table t(a int, b string) using parquet partitioned by (a) ``` and insert values with `StoreAssignmentPolicy=ANSI` using: ``` insert into t partition(a='ansi') values('ansi') ``` Before this PR: ``` +----+----+ | b| a| +----+----+ |ansi|null| +----+----+ ``` After this PR, insert will fail by: ``` java.lang.NumberFormatException: invalid input syntax for type numeric: ansi ``` (It should be better if we could use `TableOutputResolver.checkField` to fully follow `StoreAssignmentPolicy`. But since we lost the data type of static partition's value at first place, it's hard to use `TableOutputResolver.checkField`.) ### Why are the changes needed? I think we should follow `StoreAssignmentPolicy` when insert into table for any columns, including static partition. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added new test. Closes #27597 from Ngone51/fix-static-partition. Authored-by: yi.wu <yi...@databricks.com> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> (cherry picked from commit 9c2eadc7268844d49ec41da818002c99bb56addf) Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../execution/datasources/DataSourceStrategy.scala | 13 ++++++++++++- .../spark/sql/sources/DataSourceAnalysisSuite.scala | 10 ++++++++-- .../org/apache/spark/sql/sources/InsertSuite.scala | 21 +++++++++++++++++++++ 3 files changed, 41 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e3a0a0a..2d902b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -104,7 +105,17 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast None } else if (potentialSpecs.size == 1) { val partValue = potentialSpecs.head._2 - Some(Alias(cast(Literal(partValue), field.dataType), field.name)()) + conf.storeAssignmentPolicy match { + // SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition + // values but not completely follow because we can't do static type checking due to + // the reason that the parser has erased the type info of static partition values + // and converted them to string. + case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT => + Some(Alias(AnsiCast(Literal(partValue), field.dataType, + Option(conf.sessionLocalTimeZone)), field.name)()) + case _ => + Some(Alias(cast(Literal(partValue), field.dataType), field.name)()) + } } else { throw new AnalysisException( s"Partition column ${field.name} have multiple values specified, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala index e1022e3..a6c5090 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala @@ -22,9 +22,10 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, AnsiCast, Attribute, Cast, Expression, Literal} import org.apache.spark.sql.execution.datasources.DataSourceAnalysis import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.types.{DataType, IntegerType, StructType} class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll { @@ -52,7 +53,12 @@ class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll { Seq(true, false).foreach { caseSensitive => val conf = new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive) def cast(e: Expression, dt: DataType): Expression = { - Cast(e, dt, Option(conf.sessionLocalTimeZone)) + conf.storeAssignmentPolicy match { + case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT => + AnsiCast(e, dt, Option(conf.sessionLocalTimeZone)) + case _ => + Cast(e, dt, Option(conf.sessionLocalTimeZone)) + } } val rule = DataSourceAnalysis(conf) test( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index bcff30a..2d66637 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -753,6 +753,27 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } + test("SPARK-30844: static partition should also follow StoreAssignmentPolicy") { + SQLConf.StoreAssignmentPolicy.values.foreach { policy => + withSQLConf( + SQLConf.STORE_ASSIGNMENT_POLICY.key -> policy.toString) { + withTable("t") { + sql("create table t(a int, b string) using parquet partitioned by (a)") + policy match { + case SQLConf.StoreAssignmentPolicy.ANSI | SQLConf.StoreAssignmentPolicy.STRICT => + val errorMsg = intercept[NumberFormatException] { + sql("insert into t partition(a='ansi') values('ansi')") + }.getMessage + assert(errorMsg.contains("invalid input syntax for type numeric: ansi")) + case SQLConf.StoreAssignmentPolicy.LEGACY => + sql("insert into t partition(a='ansi') values('ansi')") + checkAnswer(sql("select * from t"), Row("ansi", null) :: Nil) + } + } + } + } + } + test("SPARK-24860: dynamic partition overwrite specified per source without catalog table") { withTempPath { path => Seq((1, 1), (2, 2)).toDF("i", "part") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org