This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f3e079b [SPARK-36271][SQL] Unify V1 insert check field name before prepare writter f3e079b is described below commit f3e079b09b5877c97fc10864937e76d866935880 Author: Angerszhuuuu <angers....@gmail.com> AuthorDate: Mon Aug 9 17:18:06 2021 +0800 [SPARK-36271][SQL] Unify V1 insert check field name before prepare writter ### What changes were proposed in this pull request? Unify DataSource V1 insert schema check field name before prepare writer. And in this PR we add check for avro V1 insert too. ### Why are the changes needed? Unify code and add check for avro V1 insert too. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes #33566 from AngersZhuuuu/SPARK-36271. Authored-by: Angerszhuuuu <angers....@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/avro/AvroSuite.scala | 22 ++++++++++++++++++++++ .../execution/datasources/DataSourceUtils.scala | 1 + .../datasources/parquet/ParquetWriteSupport.scala | 1 - .../sql/execution/datasources/v2/FileWrite.scala | 1 + .../datasources/v2/parquet/ParquetWrite.scala | 1 + .../spark/sql/FileBasedDataSourceSuite.scala | 22 ++++++++++++++++++++++ 6 files changed, 47 insertions(+), 1 deletion(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 43ba20f..510ddfc 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -2194,6 +2194,28 @@ class AvroV1Suite extends AvroSuite { super .sparkConf .set(SQLConf.USE_V1_SOURCE_LIST, "avro") + + test("SPARK-36271: V1 insert should check schema field name too") { + withView("v") { + spark.range(1).createTempView("v") + withTempDir { dir => + val e = intercept[AnalysisException] { + sql("SELECT ID, IF(ID=1,1,0) FROM v").write.mode(SaveMode.Overwrite) + .format("avro").save(dir.getCanonicalPath) + }.getMessage + assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains invalid character(s).")) + } + + withTempDir { dir => + val e = intercept[AnalysisException] { + sql("SELECT NAMED_STRUCT('(IF((ID = 1), 1, 0))', IF(ID=1,ID,0)) AS col1 FROM v") + .write.mode(SaveMode.Overwrite) + .format("avro").save(dir.getCanonicalPath) + }.getMessage + assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains invalid character(s).")) + } + } + } } class AvroV2Suite extends AvroSuite with ExplainSuiteHelper { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index b562d44..fcd95a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -84,6 +84,7 @@ object DataSourceUtils { throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(format.toString, field) } } + checkFieldNames(format, schema) } // SPARK-24626: Metadata files and temporary files should not be diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 20f69e8..d0cd02f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -482,7 +482,6 @@ object ParquetWriteSupport { val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" def setSchema(schema: StructType, configuration: Configuration): Unit = { - ParquetSchemaConverter.checkFieldNames(schema) configuration.set(SPARK_ROW_SCHEMA, schema.json) configuration.setIfUnset( ParquetOutputFormat.WRITER_VERSION, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index 9c4bc78..7f66a09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -93,6 +93,7 @@ trait FileWrite extends Write { s"when inserting into $pathName", caseSensitiveAnalysis) DataSource.validateSchema(schema) + // TODO: [SPARK-36340] Unify check schema filed of DataSource V2 Insert. schema.foreach { field => if (!supportsDataType(field.dataType)) { throw QueryCompilationErrors.dataTypeUnsupportedByDataSourceError(formatName, field) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala index 0316d91..b2b6d31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala @@ -72,6 +72,7 @@ case class ParquetWrite( ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) + ParquetSchemaConverter.checkFieldNames(dataSchema) // This metadata is useful for keeping UDTs like Vector/Matrix. ParquetWriteSupport.setSchema(dataSchema, conf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index c71f667..2436392 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -967,6 +967,28 @@ class FileBasedDataSourceSuite extends QueryTest checkAnswer(df, Row("v1", "v2")) } } + + test("SPARK-36271: V1 insert should check schema field name too") { + withView("v") { + spark.range(1).createTempView("v") + withTempDir { dir => + val e = intercept[AnalysisException] { + sql("SELECT ID, IF(ID=1,1,0) FROM v").write.mode(SaveMode.Overwrite) + .format("parquet").save(dir.getCanonicalPath) + }.getMessage + assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains invalid character(s).")) + } + + withTempDir { dir => + val e = intercept[AnalysisException] { + sql("SELECT NAMED_STRUCT('(IF((ID = 1), 1, 0))', IF(ID=1,ID,0)) AS col1 FROM v") + .write.mode(SaveMode.Overwrite) + .format("parquet").save(dir.getCanonicalPath) + }.getMessage + assert(e.contains("Column name \"(IF((ID = 1), 1, 0))\" contains invalid character(s).")) + } + } + } } object TestingUDT { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org