This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9ba6db9 [SPARK-33844][SQL][3.0] InsertIntoHiveDir command should
check col name too
9ba6db9 is described below
commit 9ba6db94a2fef23053f208b0b740b8b0a1cc7d88
Author: angerszhu <[email protected]>
AuthorDate: Tue Jan 5 19:45:09 2021 +0000
[SPARK-33844][SQL][3.0] InsertIntoHiveDir command should check col name too
### What changes were proposed in this pull request?
In hive-1.2.1, hive serde just split `serdeConstants.LIST_COLUMNS` and
`serdeConstants.LIST_COLUMN_TYPES` use comma.
When we use spark 2.4 with UT
```
test("insert overwrite directory with comma col name") {
withTempDir { dir =>
val path = dir.toURI.getPath
val v1 =
s"""
| INSERT OVERWRITE DIRECTORY '${path}'
| STORED AS TEXTFILE
| SELECT 1 as a, 'c' as b, if(1 = 1, "true", "false")
""".stripMargin
sql(v1).explain(true)
sql(v1).show()
}
}
```
failed with as below since column name contains `,` then column names and
column types size not equal.
```
19:56:05.618 ERROR
org.apache.spark.sql.execution.datasources.FileFormatWriter: [ angerszhu ]
Aborting job dd774f18-93fa-431f-9468-3534c7d8acda.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0
(TID 0, localhost, executor driver):
org.apache.hadoop.hive.serde2.SerDeException:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe: columns has 5 elements
while columns.types has 3 elements!
at
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.extractColumnInfo(LazySerDeParameters.java:145)
at
org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters.<init>(LazySerDeParameters.java:85)
at
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:125)
at
org.apache.spark.sql.hive.execution.HiveOutputWriter.<init>(HiveFileFormat.scala:119)
at
org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:103)
at
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
at
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:287)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:219)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:218)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$12.apply(Executor.scala:461)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:467)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
After hive-2.3 we will set COLUMN_NAME_DELIMITER to special char when col
name cntains ',':
https://github.com/apache/hive/blob/6f4c35c9e904d226451c465effdc5bfd31d395a0/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L1180-L1188
https://github.com/apache/hive/blob/6f4c35c9e904d226451c465effdc5bfd31d395a0/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L1044-L1075
And in script transform, we parse column name to avoid this problem
https://github.com/apache/spark/blob/554600c2af0dbc8979955807658fafef5dc66c08/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala#L257-L261
So I think in `InsertIntoHiveDirComman`, we should do same thing too. And I
have verified this method can make spark-2.4 work well.
### Why are the changes needed?
More save use serde
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes #31038 from AngersZhuuuu/SPARK-33844-3.0.
Authored-by: angerszhu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/hive/execution/InsertIntoHiveDirCommand.scala | 9 +++++++--
.../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 14 ++++++++++++++
2 files changed, 21 insertions(+), 2 deletions(-)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index b66c302..7ef637e 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.util.SchemaUtils
@@ -63,12 +64,16 @@ case class InsertIntoHiveDirCommand(
s"when inserting into ${storage.locationUri.get}",
sparkSession.sessionState.conf.caseSensitiveAnalysis)
- val hiveTable = HiveClientImpl.toHiveTable(CatalogTable(
+ val table = CatalogTable(
identifier = TableIdentifier(storage.locationUri.get.toString,
Some("default")),
+ provider = Some(DDLUtils.HIVE_PROVIDER),
tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW,
storage = storage,
schema = outputColumns.toStructType
- ))
+ )
+ DDLUtils.checkDataColNames(table)
+
+ val hiveTable = HiveClientImpl.toHiveTable(table)
hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB,
storage.serde.getOrElse(classOf[LazySimpleSerDe].getName))
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 765119d..c8726c7 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -2766,4 +2766,18 @@ class HiveDDLSuite
checkAnswer(sql("SHOW PARTITIONS t"), Seq.empty)
}
}
+
+ test("SPARK-33844: Insert overwrite directory should check schema too") {
+ withView("v") {
+ spark.range(1).createTempView("v")
+ withTempPath { path =>
+ val e = intercept[AnalysisException] {
+ spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY
'${path.getCanonicalPath}' " +
+ s"STORED AS PARQUET SELECT ID, if(1=1, 1, 0), abs(id), '^-' FROM
v")
+ }.getMessage
+ assert(e.contains("Attribute name \"(IF((1 = 1), 1, 0))\" contains" +
+ " invalid character(s) among \" ,;{}()\\n\\t=\". Please use alias to
rename it."))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]