Repository: spark
Updated Branches:
refs/heads/master 3e033035a -> 3d6b68b03
[SPARK-25313][SQL] Fix regression in FileFormatWriter output names
## What changes were proposed in this pull request?
Let's see the follow example:
```
val location = "/tmp/t"
val df = spark.range(10).toDF("id")
df.write.format("parquet").saveAsTable("tbl")
spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
spark.sql(s"CREATE TABLE tbl2(ID long) USING parquet location
$location")
spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
println(spark.read.parquet(location).schema)
spark.table("tbl2").show()
```
The output column name in schema will be `id` instead of `ID`, thus the last
query shows nothing from `tbl2`.
By enabling the debug message we can see that the output naming is changed from
`ID` to `id`, and then the `outputColumns` in
`InsertIntoHadoopFsRelationCommand` is changed in `RemoveRedundantAliases`.


**To guarantee correctness**, we should change the output columns from
`Seq[Attribute]` to `Seq[String]` to avoid its names being replaced by
optimizer.
I will fix project elimination related rules in
https://github.com/apache/spark/pull/22311 after this one.
## How was this patch tested?
Unit test.
Closes #22320 from gengliangwang/fixOutputSchema.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d6b68b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d6b68b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d6b68b0
Branch: refs/heads/master
Commit: 3d6b68b030ee85a0f639dd8e9b68aedf5f27b46f
Parents: 3e03303
Author: Gengliang Wang <[email protected]>
Authored: Thu Sep 6 10:37:52 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu Sep 6 10:37:52 2018 +0800
----------------------------------------------------------------------
.../execution/command/DataWritingCommand.scala | 43 +++++++++++-
.../command/createDataSourceTables.scala | 4 +-
.../sql/execution/datasources/DataSource.scala | 16 +++--
.../datasources/DataSourceStrategy.scala | 4 +-
.../InsertIntoHadoopFsRelationCommand.scala | 6 +-
.../sql/test/DataFrameReaderWriterSuite.scala | 74 ++++++++++++++++++++
.../apache/spark/sql/hive/HiveStrategies.scala | 6 +-
.../CreateHiveTableAsSelectCommand.scala | 9 +--
.../execution/InsertIntoHiveDirCommand.scala | 2 +-
.../hive/execution/InsertIntoHiveTable.scala | 2 +-
.../spark/sql/hive/execution/HiveDDLSuite.scala | 48 +++++++++++++
11 files changed, 189 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3d6b68b0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index e11dbd2..0a185b8 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Command,
LogicalPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
import org.apache.spark.sql.execution.datasources.FileFormatWriter
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
/**
@@ -41,8 +42,12 @@ trait DataWritingCommand extends Command {
override final def children: Seq[LogicalPlan] = query :: Nil
- // Output columns of the analyzed input query plan
- def outputColumns: Seq[Attribute]
+ // Output column names of the analyzed input query plan.
+ def outputColumnNames: Seq[String]
+
+ // Output columns of the analyzed input query plan.
+ def outputColumns: Seq[Attribute] =
+ DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames)
lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
@@ -53,3 +58,35 @@ trait DataWritingCommand extends Command {
def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
}
+
+object DataWritingCommand {
+ /**
+ * Returns output attributes with provided names.
+ * The length of provided names should be the same of the length of
[[LogicalPlan.output]].
+ */
+ def logicalPlanOutputWithNames(
+ query: LogicalPlan,
+ names: Seq[String]): Seq[Attribute] = {
+ // Save the output attributes to a variable to avoid duplicated function
calls.
+ val outputAttributes = query.output
+ assert(outputAttributes.length == names.length,
+ "The length of provided names doesn't match the length of output
attributes.")
+ outputAttributes.zip(names).map { case (attr, outputName) =>
+ attr.withName(outputName)
+ }
+ }
+
+ /**
+ * Returns schema of logical plan with provided names.
+ * The length of provided names should be the same of the length of
[[LogicalPlan.schema]].
+ */
+ def logicalPlanSchemaWithNames(
+ query: LogicalPlan,
+ names: Seq[String]): StructType = {
+ assert(query.schema.length == names.length,
+ "The length of provided names doesn't match the length of query schema.")
+ StructType(query.schema.zip(names).map { case (structField, outputName) =>
+ structField.copy(name = outputName)
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/3d6b68b0/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index f6ef433..b2e1f53 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -139,7 +139,7 @@ case class CreateDataSourceTableAsSelectCommand(
table: CatalogTable,
mode: SaveMode,
query: LogicalPlan,
- outputColumns: Seq[Attribute])
+ outputColumnNames: Seq[String])
extends DataWritingCommand {
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
@@ -214,7 +214,7 @@ case class CreateDataSourceTableAsSelectCommand(
catalogTable = if (tableExists) Some(table) else None)
try {
- dataSource.writeAndRead(mode, query, outputColumns, physicalPlan)
+ dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan)
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table
${table.identifier.unquotedString}", ex)
http://git-wip-us.apache.org/repos/asf/spark/blob/3d6b68b0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 1dcf9f3..ce3bc3d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -450,7 +451,7 @@ case class DataSource(
mode = mode,
catalogTable = catalogTable,
fileIndex = fileIndex,
- outputColumns = data.output)
+ outputColumnNames = data.output.map(_.name))
}
/**
@@ -460,9 +461,9 @@ case class DataSource(
* @param mode The save mode for this writing.
* @param data The input query plan that produces the data to be written.
Note that this plan
* is analyzed and optimized.
- * @param outputColumns The original output columns of the input query plan.
The optimizer may not
- * preserve the output column's names' case, so we need
this parameter
- * instead of `data.output`.
+ * @param outputColumnNames The original output column names of the input
query plan. The
+ * optimizer may not preserve the output column's
names' case, so we need
+ * this parameter instead of `data.output`.
* @param physicalPlan The physical plan of the input query plan. We should
run the writing
* command with this physical plan instead of creating a
new physical plan,
* so that the metrics can be correctly linked to the
given physical plan and
@@ -471,8 +472,9 @@ case class DataSource(
def writeAndRead(
mode: SaveMode,
data: LogicalPlan,
- outputColumns: Seq[Attribute],
+ outputColumnNames: Seq[String],
physicalPlan: SparkPlan): BaseRelation = {
+ val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data,
outputColumnNames)
if
(outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into
external storage.")
}
@@ -495,7 +497,9 @@ case class DataSource(
s"Unable to resolve $name given
[${data.output.map(_.name).mkString(", ")}]")
}
}
- val resolved = cmd.copy(partitionColumns = resolvedPartCols,
outputColumns = outputColumns)
+ val resolved = cmd.copy(
+ partitionColumns = resolvedPartCols,
+ outputColumnNames = outputColumnNames)
resolved.run(sparkSession, physicalPlan)
// Replace the schema with that of the DataFrame we just wrote out to
avoid re-inferring
copy(userSpecifiedSchema =
Some(outputColumns.toStructType.asNullable)).resolveRelation()
http://git-wip-us.apache.org/repos/asf/spark/blob/3d6b68b0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 6b61e74..c600044 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -139,7 +139,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends
Rule[LogicalPlan] with Cast
case CreateTable(tableDesc, mode, Some(query))
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
- CreateDataSourceTableAsSelectCommand(tableDesc, mode, query,
query.output)
+ CreateDataSourceTableAsSelectCommand(tableDesc, mode, query,
query.output.map(_.name))
case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
parts, query, overwrite, false) if parts.isEmpty =>
@@ -209,7 +209,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends
Rule[LogicalPlan] with Cast
mode,
table,
Some(t.location),
- actualQuery.output)
+ actualQuery.output.map(_.name))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3d6b68b0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 2ae21b7..484942d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -56,14 +56,14 @@ case class InsertIntoHadoopFsRelationCommand(
mode: SaveMode,
catalogTable: Option[CatalogTable],
fileIndex: Option[FileIndex],
- outputColumns: Seq[Attribute])
+ outputColumnNames: Seq[String])
extends DataWritingCommand {
import
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow
that
- SchemaUtils.checkSchemaColumnNameDuplication(
- query.schema,
+ SchemaUtils.checkColumnNameDuplication(
+ outputColumnNames,
s"when inserting into $outputPath",
sparkSession.sessionState.conf.caseSensitiveAnalysis)
http://git-wip-us.apache.org/repos/asf/spark/blob/3d6b68b0/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index b65058f..2378725 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -805,6 +805,80 @@ class DataFrameReaderWriterSuite extends QueryTest with
SharedSQLContext with Be
}
}
+ test("Insert overwrite table command should output correct schema: basic") {
+ withTable("tbl", "tbl2") {
+ withView("view1") {
+ val df = spark.range(10).toDF("id")
+ df.write.format("parquet").saveAsTable("tbl")
+ spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
+ spark.sql("CREATE TABLE tbl2(ID long) USING parquet")
+ spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
+ val identifier = TableIdentifier("tbl2")
+ val location =
spark.sessionState.catalog.getTableMetadata(identifier).location.toString
+ val expectedSchema = StructType(Seq(StructField("ID", LongType, true)))
+ assert(spark.read.parquet(location).schema == expectedSchema)
+ checkAnswer(spark.table("tbl2"), df)
+ }
+ }
+ }
+
+ test("Insert overwrite table command should output correct schema: complex")
{
+ withTable("tbl", "tbl2") {
+ withView("view1") {
+ val df = spark.range(10).map(x => (x, x.toInt, x.toInt)).toDF("col1",
"col2", "col3")
+ df.write.format("parquet").saveAsTable("tbl")
+ spark.sql("CREATE VIEW view1 AS SELECT * FROM tbl")
+ spark.sql("CREATE TABLE tbl2(COL1 long, COL2 int, COL3 int) USING
parquet PARTITIONED " +
+ "BY (COL2) CLUSTERED BY (COL3) INTO 3 BUCKETS")
+ spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT COL1, COL2, COL3 FROM
view1")
+ val identifier = TableIdentifier("tbl2")
+ val location =
spark.sessionState.catalog.getTableMetadata(identifier).location.toString
+ val expectedSchema = StructType(Seq(
+ StructField("COL1", LongType, true),
+ StructField("COL3", IntegerType, true),
+ StructField("COL2", IntegerType, true)))
+ assert(spark.read.parquet(location).schema == expectedSchema)
+ checkAnswer(spark.table("tbl2"), df)
+ }
+ }
+ }
+
+ test("Create table as select command should output correct schema: basic") {
+ withTable("tbl", "tbl2") {
+ withView("view1") {
+ val df = spark.range(10).toDF("id")
+ df.write.format("parquet").saveAsTable("tbl")
+ spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
+ spark.sql("CREATE TABLE tbl2 USING parquet AS SELECT ID FROM view1")
+ val identifier = TableIdentifier("tbl2")
+ val location =
spark.sessionState.catalog.getTableMetadata(identifier).location.toString
+ val expectedSchema = StructType(Seq(StructField("ID", LongType, true)))
+ assert(spark.read.parquet(location).schema == expectedSchema)
+ checkAnswer(spark.table("tbl2"), df)
+ }
+ }
+ }
+
+ test("Create table as select command should output correct schema: complex")
{
+ withTable("tbl", "tbl2") {
+ withView("view1") {
+ val df = spark.range(10).map(x => (x, x.toInt, x.toInt)).toDF("col1",
"col2", "col3")
+ df.write.format("parquet").saveAsTable("tbl")
+ spark.sql("CREATE VIEW view1 AS SELECT * FROM tbl")
+ spark.sql("CREATE TABLE tbl2 USING parquet PARTITIONED BY (COL2) " +
+ "CLUSTERED BY (COL3) INTO 3 BUCKETS AS SELECT COL1, COL2, COL3 FROM
view1")
+ val identifier = TableIdentifier("tbl2")
+ val location =
spark.sessionState.catalog.getTableMetadata(identifier).location.toString
+ val expectedSchema = StructType(Seq(
+ StructField("COL1", LongType, true),
+ StructField("COL3", IntegerType, true),
+ StructField("COL2", IntegerType, true)))
+ assert(spark.read.parquet(location).schema == expectedSchema)
+ checkAnswer(spark.table("tbl2"), df)
+ }
+ }
+ }
+
test("use Spark jobs to list files") {
withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "1") {
withTempDir { dir =>
http://git-wip-us.apache.org/repos/asf/spark/blob/3d6b68b0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 9fe83bb..07ee105 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -149,7 +149,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite,
ifPartitionNotExists)
if DDLUtils.isHiveTable(r.tableMeta) =>
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
- ifPartitionNotExists, query.output)
+ ifPartitionNotExists, query.output.map(_.name))
case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc)
=>
DDLUtils.checkDataColNames(tableDesc)
@@ -157,14 +157,14 @@ object HiveAnalysis extends Rule[LogicalPlan] {
case CreateTable(tableDesc, mode, Some(query)) if
DDLUtils.isHiveTable(tableDesc) =>
DDLUtils.checkDataColNames(tableDesc)
- CreateHiveTableAsSelectCommand(tableDesc, query, query.output, mode)
+ CreateHiveTableAsSelectCommand(tableDesc, query,
query.output.map(_.name), mode)
case InsertIntoDir(isLocal, storage, provider, child, overwrite)
if DDLUtils.isHiveTable(provider) =>
val outputPath = new Path(storage.locationUri.get)
if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)
- InsertIntoHiveDirCommand(isLocal, storage, child, overwrite,
child.output)
+ InsertIntoHiveDirCommand(isLocal, storage, child, overwrite,
child.output.map(_.name))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3d6b68b0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 27d807c..0eb2f0d 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -37,7 +37,7 @@ import
org.apache.spark.sql.execution.command.DataWritingCommand
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
- outputColumns: Seq[Attribute],
+ outputColumnNames: Seq[String],
mode: SaveMode)
extends DataWritingCommand {
@@ -63,13 +63,14 @@ case class CreateHiveTableAsSelectCommand(
query,
overwrite = false,
ifPartitionNotExists = false,
- outputColumns = outputColumns).run(sparkSession, child)
+ outputColumnNames = outputColumnNames).run(sparkSession, child)
} else {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while
data
// processing.
assert(tableDesc.schema.isEmpty)
- catalog.createTable(tableDesc.copy(schema = query.schema),
ignoreIfExists = false)
+ val schema = DataWritingCommand.logicalPlanSchemaWithNames(query,
outputColumnNames)
+ catalog.createTable(tableDesc.copy(schema = schema), ignoreIfExists =
false)
try {
// Read back the metadata of the table which was created just now.
@@ -82,7 +83,7 @@ case class CreateHiveTableAsSelectCommand(
query,
overwrite = true,
ifPartitionNotExists = false,
- outputColumns = outputColumns).run(sparkSession, child)
+ outputColumnNames = outputColumnNames).run(sparkSession, child)
} catch {
case NonFatal(e) =>
// drop the created table.
http://git-wip-us.apache.org/repos/asf/spark/blob/3d6b68b0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index cebeca0..0a73aaa 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -57,7 +57,7 @@ case class InsertIntoHiveDirCommand(
storage: CatalogStorageFormat,
query: LogicalPlan,
overwrite: Boolean,
- outputColumns: Seq[Attribute]) extends SaveAsHiveFile {
+ outputColumnNames: Seq[String]) extends SaveAsHiveFile {
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
assert(storage.locationUri.nonEmpty)
http://git-wip-us.apache.org/repos/asf/spark/blob/3d6b68b0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 02a60f1..75a0563 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -69,7 +69,7 @@ case class InsertIntoHiveTable(
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean,
- outputColumns: Seq[Attribute]) extends SaveAsHiveFile {
+ outputColumnNames: Seq[String]) extends SaveAsHiveFile {
/**
* Inserts all the rows in the table into Hive. Row objects are properly
serialized with the
http://git-wip-us.apache.org/repos/asf/spark/blob/3d6b68b0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 6708a50..9acd5e1 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -755,6 +755,54 @@ class HiveDDLSuite
}
}
+ test("Insert overwrite Hive table should output correct schema") {
+ withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") {
+ withTable("tbl", "tbl2") {
+ withView("view1") {
+ spark.sql("CREATE TABLE tbl(id long)")
+ spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4")
+ spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
+ withTempPath { path =>
+ sql(
+ s"""
+ |CREATE TABLE tbl2(ID long) USING hive
+ |OPTIONS(fileFormat 'parquet')
+ |LOCATION '${path.toURI}'
+ """.stripMargin)
+ spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
+ val expectedSchema = StructType(Seq(StructField("ID", LongType,
true)))
+ assert(spark.read.parquet(path.toString).schema == expectedSchema)
+ checkAnswer(spark.table("tbl2"), Seq(Row(4)))
+ }
+ }
+ }
+ }
+ }
+
+ test("Create Hive table as select should output correct schema") {
+ withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") {
+ withTable("tbl", "tbl2") {
+ withView("view1") {
+ spark.sql("CREATE TABLE tbl(id long)")
+ spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4")
+ spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
+ withTempPath { path =>
+ sql(
+ s"""
+ |CREATE TABLE tbl2 USING hive
+ |OPTIONS(fileFormat 'parquet')
+ |LOCATION '${path.toURI}'
+ |AS SELECT ID FROM view1
+ """.stripMargin)
+ val expectedSchema = StructType(Seq(StructField("ID", LongType,
true)))
+ assert(spark.read.parquet(path.toString).schema == expectedSchema)
+ checkAnswer(spark.table("tbl2"), Seq(Row(4)))
+ }
+ }
+ }
+ }
+ }
+
test("alter table partition - storage information") {
sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width
INT)")
sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]