[SPARK-4131] Support "Writing data into the filesystem from queries"
## What changes were proposed in this pull request? This PR implements the sql feature: INSERT OVERWRITE [LOCAL] DIRECTORY directory1 [ROW FORMAT row_format] [STORED AS file_format] SELECT ... FROM ... ## How was this patch tested? Added new unittests and also pulled the code to fb-spark so that we could test writing to hdfs directory. Author: Jane Wang <[email protected]> Closes #18975 from janewangfb/port_local_directory. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7679055 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7679055 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7679055 Branch: refs/heads/master Commit: f76790557b063edc3080d5c792167e2f8b7060d1 Parents: e4d8f9a Author: Jane Wang <[email protected]> Authored: Sat Sep 9 11:48:34 2017 -0700 Committer: gatorsmile <[email protected]> Committed: Sat Sep 9 11:48:34 2017 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 8 +- .../analysis/UnsupportedOperationChecker.scala | 3 + .../spark/sql/catalyst/parser/AstBuilder.scala | 79 +- .../plans/logical/basicLogicalOperators.scala | 26 +- .../spark/sql/execution/SparkSqlParser.scala | 79 +- .../InsertIntoDataSourceDirCommand.scala | 82 +++ .../spark/sql/execution/command/ddl.scala | 17 +- .../datasources/DataSourceStrategy.scala | 21 +- .../sql/execution/command/DDLParserSuite.scala | 52 +- .../apache/spark/sql/sources/InsertSuite.scala | 60 ++ .../apache/spark/sql/hive/HiveStrategies.scala | 11 +- .../spark/sql/hive/execution/HiveTmpPath.scala | 203 +++++ .../execution/InsertIntoHiveDirCommand.scala | 131 ++++ .../hive/execution/InsertIntoHiveTable.scala | 213 +----- .../sql/hive/execution/SaveAsHiveFile.scala | 73 ++ .../sql/hive/InsertIntoHiveTableSuite.scala | 551 -------------- .../org/apache/spark/sql/hive/InsertSuite.scala | 731 +++++++++++++++++++ 17 files changed, 1565 insertions(+), 775 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index f741dcf..239e73e 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -243,8 +243,10 @@ query ; insertInto - : INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? - | INSERT INTO TABLE? tableIdentifier partitionSpec? + : INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable + | INSERT INTO TABLE? tableIdentifier partitionSpec? #insertIntoTable + | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir + | INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir ; partitionSpecLocation @@ -745,6 +747,7 @@ nonReserved | AND | CASE | CAST | DISTINCT | DIV | ELSE | END | FUNCTION | INTERVAL | MACRO | OR | STRATIFY | THEN | UNBOUNDED | WHEN | DATABASE | SELECT | FROM | WHERE | HAVING | TO | TABLE | WITH | NOT | CURRENT_DATE | CURRENT_TIMESTAMP + | DIRECTORY ; SELECT: 'SELECT'; @@ -815,6 +818,7 @@ WITH: 'WITH'; VALUES: 'VALUES'; CREATE: 'CREATE'; TABLE: 'TABLE'; +DIRECTORY: 'DIRECTORY'; VIEW: 'VIEW'; REPLACE: 'REPLACE'; INSERT: 'INSERT'; http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 6ab4153..33ba086 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -146,6 +146,9 @@ object UnsupportedOperationChecker { throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + "streaming DataFrames/Datasets") + case _: InsertIntoDir => + throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") + // mapGroupsWithState and flatMapGroupsWithState case m: FlatMapGroupsWithState if m.isStreaming => http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8a45c52..891f616 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ @@ -178,11 +179,64 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Add an INSERT INTO [TABLE]/INSERT OVERWRITE TABLE operation to the logical plan. + * Parameters used for writing query to a table: + * (tableIdentifier, partitionKeys, exists). + */ + type InsertTableParams = (TableIdentifier, Map[String, Option[String]], Boolean) + + /** + * Parameters used for writing query to a directory: (isLocal, CatalogStorageFormat, provider). + */ + type InsertDirParams = (Boolean, CatalogStorageFormat, Option[String]) + + /** + * Add an + * {{{ + * INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? + * INSERT INTO [TABLE] tableIdentifier [partitionSpec] + * INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat] + * INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList] + * }}} + * operation to logical plan */ private def withInsertInto( ctx: InsertIntoContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { + ctx match { + case table: InsertIntoTableContext => + val (tableIdent, partitionKeys, exists) = visitInsertIntoTable(table) + InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, false, exists) + case table: InsertOverwriteTableContext => + val (tableIdent, partitionKeys, exists) = visitInsertOverwriteTable(table) + InsertIntoTable(UnresolvedRelation(tableIdent), partitionKeys, query, true, exists) + case dir: InsertOverwriteDirContext => + val (isLocal, storage, provider) = visitInsertOverwriteDir(dir) + InsertIntoDir(isLocal, storage, provider, query, overwrite = true) + case hiveDir: InsertOverwriteHiveDirContext => + val (isLocal, storage, provider) = visitInsertOverwriteHiveDir(hiveDir) + InsertIntoDir(isLocal, storage, provider, query, overwrite = true) + case _ => + throw new ParseException("Invalid InsertIntoContext", ctx) + } + } + + /** + * Add an INSERT INTO TABLE operation to the logical plan. + */ + override def visitInsertIntoTable( + ctx: InsertIntoTableContext): InsertTableParams = withOrigin(ctx) { + val tableIdent = visitTableIdentifier(ctx.tableIdentifier) + val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty) + + (tableIdent, partitionKeys, false) + } + + /** + * Add an INSERT OVERWRITE TABLE operation to the logical plan. + */ + override def visitInsertOverwriteTable( + ctx: InsertOverwriteTableContext): InsertTableParams = withOrigin(ctx) { + assert(ctx.OVERWRITE() != null) val tableIdent = visitTableIdentifier(ctx.tableIdentifier) val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty) @@ -192,12 +246,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) } - InsertIntoTable( - UnresolvedRelation(tableIdent), - partitionKeys, - query, - ctx.OVERWRITE != null, - ctx.EXISTS != null) + (tableIdent, partitionKeys, ctx.EXISTS() != null) + } + + /** + * Write to a directory, returning a [[InsertIntoDir]] logical plan. + */ + override def visitInsertOverwriteDir( + ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) { + throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx) + } + + /** + * Write to a directory, returning a [[InsertIntoDir]] logical plan. + */ + override def visitInsertOverwriteHiveDir( + ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) { + throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 4b3054d..f443cd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ @@ -360,6 +360,30 @@ case class InsertIntoTable( } /** + * Insert query result into a directory. + * + * @param isLocal Indicates whether the specified directory is local directory + * @param storage Info about output file, row and what serialization format + * @param provider Specifies what data source to use; only used for data source file. + * @param child The query to be executed + * @param overwrite If true, the existing directory will be overwritten + * + * Note that this plan is unresolved and has to be replaced by the concrete implementations + * during analysis. + */ +case class InsertIntoDir( + isLocal: Boolean, + storage: CatalogStorageFormat, + provider: Option[String], + child: LogicalPlan, + overwrite: Boolean = true) + extends UnaryNode { + + override def output: Seq[Attribute] = Seq.empty + override lazy val resolved: Boolean = false +} + +/** * A container for holding the view description(CatalogTable), and the output of the view. The * child should be a logical plan parsed from the `CatalogTable.viewText`, should throw an error * if the `viewText` is not defined. http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d3f6ab5..d38919b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.{CreateTable, _} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} import org.apache.spark.sql.types.StructType @@ -1512,4 +1512,81 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { query: LogicalPlan): LogicalPlan = { RepartitionByExpression(expressions, query, conf.numShufflePartitions) } + + /** + * Return the parameters for [[InsertIntoDir]] logical plan. + * + * Expected format: + * {{{ + * INSERT OVERWRITE DIRECTORY + * [path] + * [OPTIONS table_property_list] + * select_statement; + * }}} + */ + override def visitInsertOverwriteDir( + ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) { + if (ctx.LOCAL != null) { + throw new ParseException( + "LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source", ctx) + } + + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + var storage = DataSource.buildStorageFormatFromOptions(options) + + val path = Option(ctx.path).map(string).getOrElse("") + + if (!(path.isEmpty ^ storage.locationUri.isEmpty)) { + throw new ParseException( + "Directory path and 'path' in OPTIONS should be specified one, but not both", ctx) + } + + if (!path.isEmpty) { + val customLocation = Some(CatalogUtils.stringToURI(path)) + storage = storage.copy(locationUri = customLocation) + } + + val provider = ctx.tableProvider.qualifiedName.getText + + (false, storage, Some(provider)) + } + + /** + * Return the parameters for [[InsertIntoDir]] logical plan. + * + * Expected format: + * {{{ + * INSERT OVERWRITE [LOCAL] DIRECTORY + * path + * [ROW FORMAT row_format] + * [STORED AS file_format] + * select_statement; + * }}} + */ + override def visitInsertOverwriteHiveDir( + ctx: InsertOverwriteHiveDirContext): InsertDirParams = withOrigin(ctx) { + validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) + val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) + .getOrElse(CatalogStorageFormat.empty) + val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) + .getOrElse(CatalogStorageFormat.empty) + + val path = string(ctx.path) + // The path field is required + if (path.isEmpty) { + operationNotAllowed("INSERT OVERWRITE DIRECTORY must be accompanied by path", ctx) + } + + val defaultStorage = HiveSerDe.getDefaultStorage(conf) + + val storage = CatalogStorageFormat( + locationUri = Some(CatalogUtils.stringToURI(path)), + inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), + outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), + serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), + compressed = false, + properties = rowStorage.properties ++ fileStorage.properties) + + (ctx.LOCAL != null, storage, Some(DDLUtils.HIVE_PROVIDER)) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala new file mode 100644 index 0000000..633de4c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.SparkException +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources._ + +/** + * A command used to write the result of a query to a directory. + * + * The syntax of using this command in SQL is: + * {{{ + * INSERT OVERWRITE DIRECTORY (path=STRING)? + * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...]) + * SELECT ... + * }}} + * + * @param storage storage format used to describe how the query result is stored. + * @param provider the data source type to be used + * @param query the logical plan representing data to write to + * @param overwrite whthere overwrites existing directory + */ +case class InsertIntoDataSourceDirCommand( + storage: CatalogStorageFormat, + provider: String, + query: LogicalPlan, + overwrite: Boolean) extends RunnableCommand { + + override def children: Seq[LogicalPlan] = Seq(query) + + override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { + assert(children.length == 1) + assert(storage.locationUri.nonEmpty, "Directory path is required") + assert(provider.nonEmpty, "Data source is required") + + // Create the relation based on the input logical plan: `query`. + val pathOption = storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) + + val dataSource = DataSource( + sparkSession, + className = provider, + options = storage.properties ++ pathOption, + catalogTable = None) + + val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass) + if (!isFileFormat) { + throw new SparkException( + "Only Data Sources providing FileFormat are supported: " + dataSource.providingClass) + } + + val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists + try { + sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query)) + dataSource.writeAndRead(saveMode, query) + } catch { + case ex: AnalysisException => + logError(s"Failed to write to directory " + storage.locationUri.toString, ex) + throw ex + } + + Seq.empty[Row] + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 7611e1c..b06f4cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter import org.apache.spark.sql.internal.HiveSerDe @@ -869,4 +870,18 @@ object DDLUtils { } } } + + /** + * Throws exception if outputPath tries to overwrite inputpath. + */ + def verifyNotReadPath(query: LogicalPlan, outputPath: Path) : Unit = { + val inputPaths = query.collect { + case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths + }.flatten + + if (inputPaths.contains(outputPath)) { + throw new AnalysisException( + "Cannot overwrite a path that is also being read from.") + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5d6223d..018f24e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -17,8 +17,11 @@ package org.apache.spark.sql.execution.datasources +import java.util.Locale import java.util.concurrent.Callable +import org.apache.hadoop.fs.Path + import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -29,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} @@ -142,6 +145,14 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast parts, query, overwrite, false) if parts.isEmpty => InsertIntoDataSourceCommand(l, query, overwrite) + case InsertIntoDir(_, storage, provider, query, overwrite) + if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) != DDLUtils.HIVE_PROVIDER => + + val outputPath = new Path(storage.locationUri.get) + if (overwrite) DDLUtils.verifyNotReadPath(query, outputPath) + + InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite) + case i @ InsertIntoTable( l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, query, overwrite, _) => // If the InsertIntoTable command is for a partitioned HadoopFsRelation and @@ -178,15 +189,9 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast } val outputPath = t.location.rootPaths.head - val inputPaths = actualQuery.collect { - case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths - }.flatten + if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath) val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append - if (overwrite && inputPaths.contains(outputPath)) { - throw new AnalysisException( - "Cannot overwrite a path that is also being read from.") - } val partitionSchema = actualQuery.resolve( t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 4ee3821..fa5172c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -32,7 +32,8 @@ import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, Project, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation} import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -524,6 +525,55 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { assert(e.message.contains("you can only specify one of them.")) } + test("insert overwrite directory") { + val v1 = "INSERT OVERWRITE DIRECTORY '/tmp/file' USING parquet SELECT 1 as a" + parser.parsePlan(v1) match { + case InsertIntoDir(_, storage, provider, query, overwrite) => + assert(storage.locationUri.isDefined && storage.locationUri.get.toString == "/tmp/file") + case other => + fail(s"Expected to parse ${classOf[InsertIntoDataSourceDirCommand].getClass.getName}" + + " from query," + s" got ${other.getClass.getName}: $v1") + } + + val v2 = "INSERT OVERWRITE DIRECTORY USING parquet SELECT 1 as a" + val e2 = intercept[ParseException] { + parser.parsePlan(v2) + } + assert(e2.message.contains( + "Directory path and 'path' in OPTIONS should be specified one, but not both")) + + val v3 = + """ + | INSERT OVERWRITE DIRECTORY USING json + | OPTIONS ('path' '/tmp/file', a 1, b 0.1, c TRUE) + | SELECT 1 as a + """.stripMargin + parser.parsePlan(v3) match { + case InsertIntoDir(_, storage, provider, query, overwrite) => + assert(storage.locationUri.isDefined && provider == Some("json")) + assert(storage.properties.get("a") == Some("1")) + assert(storage.properties.get("b") == Some("0.1")) + assert(storage.properties.get("c") == Some("true")) + assert(!storage.properties.contains("abc")) + assert(!storage.properties.contains("path")) + case other => + fail(s"Expected to parse ${classOf[InsertIntoDataSourceDirCommand].getClass.getName}" + + " from query," + s"got ${other.getClass.getName}: $v1") + } + + val v4 = + """ + | INSERT OVERWRITE DIRECTORY '/tmp/file' USING json + | OPTIONS ('path' '/tmp/file', a 1, b 0.1, c TRUE) + | SELECT 1 as a + """.stripMargin + val e4 = intercept[ParseException] { + parser.parsePlan(v4) + } + assert(e4.message.contains( + "Directory path and 'path' in OPTIONS should be specified one, but not both")) + } + // ALTER TABLE table_name RENAME TO new_table_name; // ALTER VIEW view_name RENAME TO new_view_name; test("alter table/view: rename table/view") { http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 41abff2..875b745 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.sources import java.io.File +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -366,4 +367,63 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { Row(Array(1, 2), Array("a", "b"))) } } + + test("insert overwrite directory") { + withTempDir { dir => + val path = dir.toURI.getPath + + val v1 = + s""" + | INSERT OVERWRITE DIRECTORY '${path}' + | USING json + | OPTIONS (a 1, b 0.1, c TRUE) + | SELECT 1 as a, 'c' as b + """.stripMargin + + spark.sql(v1) + + checkAnswer( + spark.read.json(dir.getCanonicalPath), + sql("SELECT 1 as a, 'c' as b")) + } + } + + test("insert overwrite directory with path in options") { + withTempDir { dir => + val path = dir.toURI.getPath + + val v1 = + s""" + | INSERT OVERWRITE DIRECTORY + | USING json + | OPTIONS ('path' '${path}') + | SELECT 1 as a, 'c' as b + """.stripMargin + + spark.sql(v1) + + checkAnswer( + spark.read.json(dir.getCanonicalPath), + sql("SELECT 1 as a, 'c' as b")) + } + } + + test("insert overwrite directory to data source not providing FileFormat") { + withTempDir { dir => + val path = dir.toURI.getPath + + val v1 = + s""" + | INSERT OVERWRITE DIRECTORY '${path}' + | USING JDBC + | OPTIONS (a 1, b 0.1, c TRUE) + | SELECT 1 as a, 'c' as b + """.stripMargin + val e = intercept[SparkException] { + spark.sql(v1) + }.getMessage + + assert(e.contains("Only Data Sources providing FileFormat are supported")) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 47203a8..caf554d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, + ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} @@ -157,6 +158,14 @@ object HiveAnalysis extends Rule[LogicalPlan] { case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => DDLUtils.checkDataSchemaFieldNames(tableDesc) CreateHiveTableAsSelectCommand(tableDesc, query, mode) + + case InsertIntoDir(isLocal, storage, provider, child, overwrite) + if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER => + + val outputPath = new Path(storage.locationUri.get) + if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath) + + InsertIntoHiveDirCommand(isLocal, storage, child, overwrite) } } http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala new file mode 100644 index 0000000..15ca1df --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTmpPath.scala @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.{File, IOException} +import java.net.URI +import java.text.SimpleDateFormat +import java.util.{Date, Locale, Random} + +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.ql.exec.TaskRunner + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.client.HiveVersion + +// Base trait for getting a temporary location for writing data +private[hive] trait HiveTmpPath extends Logging { + + var createdTempDir: Option[Path] = None + + def getExternalTmpPath( + sparkSession: SparkSession, + hadoopConf: Configuration, + path: Path): Path = { + import org.apache.spark.sql.hive.client.hive._ + + // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under + // a common scratch directory. After the writing is finished, Hive will simply empty the table + // directory and move the staging directory to it. + // After Hive 1.1, Hive will create the staging directory under the table directory, and when + // moving staging directory to table directory, Hive will still empty the table directory, but + // will exclude the staging directory there. + // We have to follow the Hive behavior here, to avoid troubles. For example, if we create + // staging directory under the table director for Hive prior to 1.1, the staging directory will + // be removed by Hive when Hive is trying to empty the table directory. + val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) + val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1) + + // Ensure all the supported versions are considered here. + assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == + allSupportedHiveVersions) + + val externalCatalog = sparkSession.sharedState.externalCatalog + val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version + val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") + + if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { + oldVersionExternalTempPath(path, hadoopConf, scratchDir) + } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { + newVersionExternalTempPath(path, hadoopConf, stagingDir) + } else { + throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) + } + } + + def deleteExternalTmpPath(hadoopConf: Configuration) : Unit = { + // Attempt to delete the staging directory and the inclusive files. If failed, the files are + // expected to be dropped at the normal termination of VM since deleteOnExit is used. + try { + createdTempDir.foreach { path => + val fs = path.getFileSystem(hadoopConf) + if (fs.delete(path, true)) { + // If we successfully delete the staging directory, remove it from FileSystem's cache. + fs.cancelDeleteOnExit(path) + } + } + } catch { + case NonFatal(e) => + val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) + } + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 + private def oldVersionExternalTempPath( + path: Path, + hadoopConf: Configuration, + scratchDir: String): Path = { + val extURI: URI = path.toUri + val scratchPath = new Path(scratchDir, executionId) + var dirPath = new Path( + extURI.getScheme, + extURI.getAuthority, + scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) + + try { + val fs: FileSystem = dirPath.getFileSystem(hadoopConf) + dirPath = new Path(fs.makeQualified(dirPath).toString()) + + if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) + } + createdTempDir = Some(dirPath) + fs.deleteOnExit(dirPath) + } catch { + case e: IOException => + throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e) + } + dirPath + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 + private def newVersionExternalTempPath( + path: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { + val extURI: URI = path.toUri + if (extURI.getScheme == "viewfs") { + getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir) + } else { + new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") + } + } + + private def getExtTmpPathRelTo( + path: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { + new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 + } + + private def getExternalScratchDir( + extURI: URI, + hadoopConf: Configuration, + stagingDir: String): Path = { + getStagingDir( + new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), + hadoopConf, + stagingDir) + } + + private def getStagingDir( + inputPath: Path, + hadoopConf: Configuration, + stagingDir: String): Path = { + val inputPathUri: URI = inputPath.toUri + val inputPathName: String = inputPathUri.getPath + val fs: FileSystem = inputPath.getFileSystem(hadoopConf) + var stagingPathName: String = + if (inputPathName.indexOf(stagingDir) == -1) { + new Path(inputPathName, stagingDir).toString + } else { + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) + } + + // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the + // staging directory needs to avoid being deleted when users set hive.exec.stagingdir + // under the table directory. + if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) && + !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) { + logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + + "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + + "directory.") + stagingPathName = new Path(inputPathName, ".hive-staging").toString + } + + val dir: Path = + fs.makeQualified( + new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) + logDebug("Created staging dir = " + dir + " for path = " + inputPath) + try { + if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") + } + createdTempDir = Some(dir) + fs.deleteOnExit(dir) + } catch { + case e: IOException => + throw new RuntimeException( + "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) + } + dir + } + + private def executionId: String = { + val rand: Random = new Random + val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) + "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala new file mode 100644 index 0000000..2110038 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import scala.language.existentials + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.mapred._ + +import org.apache.spark.SparkException +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive.client.HiveClientImpl + +/** + * Command for writing the results of `query` to file system. + * + * The syntax of using this command in SQL is: + * {{{ + * INSERT OVERWRITE [LOCAL] DIRECTORY + * path + * [ROW FORMAT row_format] + * [STORED AS file_format] + * SELECT ... + * }}} + * + * @param isLocal whether the path specified in `storage` is a local directory + * @param storage storage format used to describe how the query result is stored. + * @param query the logical plan representing data to write to + * @param overwrite whether overwrites existing directory + */ +case class InsertIntoHiveDirCommand( + isLocal: Boolean, + storage: CatalogStorageFormat, + query: LogicalPlan, + overwrite: Boolean) extends SaveAsHiveFile with HiveTmpPath { + + override def children: Seq[LogicalPlan] = query :: Nil + + override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { + assert(children.length == 1) + assert(storage.locationUri.nonEmpty) + + val hiveTable = HiveClientImpl.toHiveTable(CatalogTable( + identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")), + tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW, + storage = storage, + schema = query.schema + )) + hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB, + storage.serde.getOrElse(classOf[LazySimpleSerDe].getName)) + + val tableDesc = new TableDesc( + hiveTable.getInputFormatClass, + hiveTable.getOutputFormatClass, + hiveTable.getMetadata + ) + + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val jobConf = new JobConf(hadoopConf) + + val targetPath = new Path(storage.locationUri.get) + val writeToPath = + if (isLocal) { + val localFileSystem = FileSystem.getLocal(jobConf) + localFileSystem.makeQualified(targetPath) + } else { + val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf) + val dfs = qualifiedPath.getFileSystem(jobConf) + if (!dfs.exists(qualifiedPath)) { + dfs.mkdirs(qualifiedPath.getParent) + } + qualifiedPath + } + + val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, writeToPath) + val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc( + tmpPath.toString, tableDesc, false) + + try { + saveAsHiveFile( + sparkSession = sparkSession, + plan = children.head, + hadoopConf = hadoopConf, + fileSinkConf = fileSinkConf, + outputLocation = tmpPath.toString) + + val fs = writeToPath.getFileSystem(hadoopConf) + if (overwrite && fs.exists(writeToPath)) { + fs.listStatus(writeToPath).foreach { existFile => + if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true) + } + } + + fs.listStatus(tmpPath).foreach { + tmpFile => fs.rename(tmpFile.getPath, writeToPath) + } + } catch { + case e: Throwable => + throw new SparkException( + "Failed inserting overwrite directory " + storage.locationUri.get, e) + } finally { + deleteExternalTmpPath(hadoopConf) + } + + Seq.empty[Row] + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 46610f8..5bdc97a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,32 +17,22 @@ package org.apache.spark.sql.hive.execution -import java.io.{File, IOException} -import java.net.URI -import java.text.SimpleDateFormat -import java.util.{Date, Locale, Random} - import scala.util.control.NonFatal -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.ErrorMsg -import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException -import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.{CommandUtils, DataWritingCommand} -import org.apache.spark.sql.execution.datasources.FileFormatWriter +import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} -import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} +import org.apache.spark.sql.hive.client.HiveClientImpl /** @@ -80,152 +70,10 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifPartitionNotExists: Boolean) extends DataWritingCommand { + ifPartitionNotExists: Boolean) extends SaveAsHiveFile with HiveTmpPath { override def children: Seq[LogicalPlan] = query :: Nil - var createdTempDir: Option[Path] = None - - private def executionId: String = { - val rand: Random = new Random - val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) - "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) - } - - private def getStagingDir( - inputPath: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val inputPathUri: URI = inputPath.toUri - val inputPathName: String = inputPathUri.getPath - val fs: FileSystem = inputPath.getFileSystem(hadoopConf) - var stagingPathName: String = - if (inputPathName.indexOf(stagingDir) == -1) { - new Path(inputPathName, stagingDir).toString - } else { - inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) - } - - // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the - // staging directory needs to avoid being deleted when users set hive.exec.stagingdir - // under the table directory. - if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) && - !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) { - logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + - "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + - "directory.") - stagingPathName = new Path(inputPathName, ".hive-staging").toString - } - - val dir: Path = - fs.makeQualified( - new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) - logDebug("Created staging dir = " + dir + " for path = " + inputPath) - try { - if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") - } - createdTempDir = Some(dir) - fs.deleteOnExit(dir) - } catch { - case e: IOException => - throw new RuntimeException( - "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) - } - dir - } - - private def getExternalScratchDir( - extURI: URI, - hadoopConf: Configuration, - stagingDir: String): Path = { - getStagingDir( - new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), - hadoopConf, - stagingDir) - } - - def getExternalTmpPath( - path: Path, - hiveVersion: HiveVersion, - hadoopConf: Configuration, - stagingDir: String, - scratchDir: String): Path = { - import org.apache.spark.sql.hive.client.hive._ - - // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under - // a common scratch directory. After the writing is finished, Hive will simply empty the table - // directory and move the staging directory to it. - // After Hive 1.1, Hive will create the staging directory under the table directory, and when - // moving staging directory to table directory, Hive will still empty the table directory, but - // will exclude the staging directory there. - // We have to follow the Hive behavior here, to avoid troubles. For example, if we create - // staging directory under the table director for Hive prior to 1.1, the staging directory will - // be removed by Hive when Hive is trying to empty the table directory. - val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) - val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1) - - // Ensure all the supported versions are considered here. - assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == - allSupportedHiveVersions) - - if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { - oldVersionExternalTempPath(path, hadoopConf, scratchDir) - } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { - newVersionExternalTempPath(path, hadoopConf, stagingDir) - } else { - throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) - } - } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 - def oldVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - scratchDir: String): Path = { - val extURI: URI = path.toUri - val scratchPath = new Path(scratchDir, executionId) - var dirPath = new Path( - extURI.getScheme, - extURI.getAuthority, - scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) - - try { - val fs: FileSystem = dirPath.getFileSystem(hadoopConf) - dirPath = new Path(fs.makeQualified(dirPath).toString()) - - if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { - throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) - } - createdTempDir = Some(dirPath) - fs.deleteOnExit(dirPath) - } catch { - case e: IOException => - throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e) - } - dirPath - } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - def newVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - val extURI: URI = path.toUri - if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir) - } else { - new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000") - } - } - - def getExtTmpPathRelTo( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { - new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 - } - /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the @@ -234,12 +82,8 @@ case class InsertIntoHiveTable( override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { assert(children.length == 1) - val sessionState = sparkSession.sessionState val externalCatalog = sparkSession.sharedState.externalCatalog - val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version - val hadoopConf = sessionState.newHadoopConf() - val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") - val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") + val hadoopConf = sparkSession.sessionState.newHadoopConf() val hiveQlTable = HiveClientImpl.toHiveTable(table) // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer @@ -254,23 +98,8 @@ case class InsertIntoHiveTable( hiveQlTable.getMetadata ) val tableLocation = hiveQlTable.getDataLocation - val tmpLocation = - getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir) + val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean - - if (isCompressed) { - // Please note that isCompressed, "mapreduce.output.fileoutputformat.compress", - // "mapreduce.output.fileoutputformat.compress.codec", and - // "mapreduce.output.fileoutputformat.compress.type" - // have no impact on ORC because it uses table properties to store compression information. - hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") - fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(hadoopConf - .get("mapreduce.output.fileoutputformat.compress.codec")) - fileSinkConf.setCompressType(hadoopConf - .get("mapreduce.output.fileoutputformat.compress.type")) - } val numDynamicPartitions = partition.values.count(_.isEmpty) val numStaticPartitions = partition.values.count(_.nonEmpty) @@ -332,11 +161,6 @@ case class InsertIntoHiveTable( case _ => // do nothing since table has no bucketing } - val committer = FileCommitProtocol.instantiate( - sparkSession.sessionState.conf.fileCommitProtocolClass, - jobId = java.util.UUID.randomUUID().toString, - outputPath = tmpLocation.toString) - val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name => query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse { throw new AnalysisException( @@ -344,17 +168,13 @@ case class InsertIntoHiveTable( }.asInstanceOf[Attribute] } - FileFormatWriter.write( + saveAsHiveFile( sparkSession = sparkSession, plan = children.head, - fileFormat = new HiveFileFormat(fileSinkConf), - committer = committer, - outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty), hadoopConf = hadoopConf, - partitionColumns = partitionAttributes, - bucketSpec = None, - statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), - options = Map.empty) + fileSinkConf = fileSinkConf, + outputLocation = tmpLocation.toString, + partitionAttributes = partitionAttributes) if (partition.nonEmpty) { if (numDynamicPartitions > 0) { @@ -422,18 +242,7 @@ case class InsertIntoHiveTable( // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. - try { - createdTempDir.foreach { path => - val fs = path.getFileSystem(hadoopConf) - if (fs.delete(path, true)) { - // If we successfully delete the staging directory, remove it from FileSystem's cache. - fs.cancelDeleteOnExit(path) - } - } - } catch { - case NonFatal(e) => - logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) - } + deleteExternalTmpPath(hadoopConf) // un-cache this table. sparkSession.catalog.uncacheTable(table.identifier.quotedString) http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala new file mode 100644 index 0000000..7de9b42 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.DataWritingCommand +import org.apache.spark.sql.execution.datasources.FileFormatWriter +import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} + +// Base trait from which all hive insert statement physical execution extends. +private[hive] trait SaveAsHiveFile extends DataWritingCommand { + + protected def saveAsHiveFile( + sparkSession: SparkSession, + plan: SparkPlan, + hadoopConf: Configuration, + fileSinkConf: FileSinkDesc, + outputLocation: String, + partitionAttributes: Seq[Attribute] = Nil): Unit = { + + val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean + if (isCompressed) { + // Please note that isCompressed, "mapreduce.output.fileoutputformat.compress", + // "mapreduce.output.fileoutputformat.compress.codec", and + // "mapreduce.output.fileoutputformat.compress.type" + // have no impact on ORC because it uses table properties to store compression information. + hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(hadoopConf + .get("mapreduce.output.fileoutputformat.compress.codec")) + fileSinkConf.setCompressType(hadoopConf + .get("mapreduce.output.fileoutputformat.compress.type")) + } + + val committer = FileCommitProtocol.instantiate( + sparkSession.sessionState.conf.fileCommitProtocolClass, + jobId = java.util.UUID.randomUUID().toString, + outputPath = outputLocation) + + FileFormatWriter.write( + sparkSession = sparkSession, + plan = plan, + fileFormat = new HiveFileFormat(fileSinkConf), + committer = committer, + outputSpec = FileFormatWriter.OutputSpec(outputLocation, Map.empty), + hadoopConf = hadoopConf, + partitionColumns = partitionAttributes, + bucketSpec = None, + statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), + options = Map.empty) + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala deleted file mode 100644 index e93c654..0000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ /dev/null @@ -1,551 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.io.File - -import org.scalatest.BeforeAndAfter - -import org.apache.spark.SparkException -import org.apache.spark.sql.{QueryTest, _} -import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils - -case class TestData(key: Int, value: String) - -case class ThreeCloumntable(key: Int, value: String, key1: String) - -class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter - with SQLTestUtils { - import spark.implicits._ - - override lazy val testData = spark.sparkContext.parallelize( - (1 to 100).map(i => TestData(i, i.toString))).toDF() - - before { - // Since every we are doing tests for DDL statements, - // it is better to reset before every test. - hiveContext.reset() - // Creates a temporary view with testData, which will be used in all tests. - testData.createOrReplaceTempView("testData") - } - - test("insertInto() HiveTable") { - withTable("createAndInsertTest") { - sql("CREATE TABLE createAndInsertTest (key int, value string)") - - // Add some data. - testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest") - - // Make sure the table has also been updated. - checkAnswer( - sql("SELECT * FROM createAndInsertTest"), - testData.collect().toSeq - ) - - // Add more data. - testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest") - - // Make sure the table has been updated. - checkAnswer( - sql("SELECT * FROM createAndInsertTest"), - testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq - ) - - // Now overwrite. - testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest") - - // Make sure the registered table has also been updated. - checkAnswer( - sql("SELECT * FROM createAndInsertTest"), - testData.collect().toSeq - ) - } - } - - test("Double create fails when allowExisting = false") { - withTable("doubleCreateAndInsertTest") { - sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)") - - intercept[AnalysisException] { - sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)") - } - } - } - - test("Double create does not fail when allowExisting = true") { - withTable("doubleCreateAndInsertTest") { - sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)") - sql("CREATE TABLE IF NOT EXISTS doubleCreateAndInsertTest (key int, value string)") - } - } - - test("SPARK-4052: scala.collection.Map as value type of MapType") { - val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil) - val rowRDD = spark.sparkContext.parallelize( - (1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> s"value$i")))) - val df = spark.createDataFrame(rowRDD, schema) - df.createOrReplaceTempView("tableWithMapValue") - sql("CREATE TABLE hiveTableWithMapValue(m MAP <STRING, STRING>)") - sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue") - - checkAnswer( - sql("SELECT * FROM hiveTableWithMapValue"), - rowRDD.collect().toSeq - ) - - sql("DROP TABLE hiveTableWithMapValue") - } - - test("SPARK-4203:random partition directory order") { - sql("CREATE TABLE tmp_table (key int, value string)") - val tmpDir = Utils.createTempDir() - // The default value of hive.exec.stagingdir. - val stagingDir = ".hive-staging" - - sql( - s""" - |CREATE TABLE table_with_partition(c1 string) - |PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string) - |location '${tmpDir.toURI.toString}' - """.stripMargin) - sql( - """ - |INSERT OVERWRITE TABLE table_with_partition - |partition (p1='a',p2='b',p3='c',p4='c',p5='1') - |SELECT 'blarr' FROM tmp_table - """.stripMargin) - sql( - """ - |INSERT OVERWRITE TABLE table_with_partition - |partition (p1='a',p2='b',p3='c',p4='c',p5='2') - |SELECT 'blarr' FROM tmp_table - """.stripMargin) - sql( - """ - |INSERT OVERWRITE TABLE table_with_partition - |partition (p1='a',p2='b',p3='c',p4='c',p5='3') - |SELECT 'blarr' FROM tmp_table - """.stripMargin) - sql( - """ - |INSERT OVERWRITE TABLE table_with_partition - |partition (p1='a',p2='b',p3='c',p4='c',p5='4') - |SELECT 'blarr' FROM tmp_table - """.stripMargin) - def listFolders(path: File, acc: List[String]): List[List[String]] = { - val dir = path.listFiles() - val folders = dir.filter { e => e.isDirectory && !e.getName().startsWith(stagingDir) }.toList - if (folders.isEmpty) { - List(acc.reverse) - } else { - folders.flatMap(x => listFolders(x, x.getName :: acc)) - } - } - val expected = List( - "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=2"::Nil, - "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=3"::Nil, - "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=1"::Nil, - "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=4"::Nil - ) - assert(listFolders(tmpDir, List()).sortBy(_.toString()) === expected.sortBy(_.toString)) - sql("DROP TABLE table_with_partition") - sql("DROP TABLE tmp_table") - } - - testPartitionedTable("INSERT OVERWRITE - partition IF NOT EXISTS") { tableName => - val selQuery = s"select a, b, c, d from $tableName" - sql( - s""" - |INSERT OVERWRITE TABLE $tableName - |partition (b=2, c=3) - |SELECT 1, 4 - """.stripMargin) - checkAnswer(sql(selQuery), Row(1, 2, 3, 4)) - - sql( - s""" - |INSERT OVERWRITE TABLE $tableName - |partition (b=2, c=3) - |SELECT 5, 6 - """.stripMargin) - checkAnswer(sql(selQuery), Row(5, 2, 3, 6)) - - val e = intercept[AnalysisException] { - sql( - s""" - |INSERT OVERWRITE TABLE $tableName - |partition (b=2, c) IF NOT EXISTS - |SELECT 7, 8, 3 - """.stripMargin) - } - assert(e.getMessage.contains( - "Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [c]")) - - // If the partition already exists, the insert will overwrite the data - // unless users specify IF NOT EXISTS - sql( - s""" - |INSERT OVERWRITE TABLE $tableName - |partition (b=2, c=3) IF NOT EXISTS - |SELECT 9, 10 - """.stripMargin) - checkAnswer(sql(selQuery), Row(5, 2, 3, 6)) - - // ADD PARTITION has the same effect, even if no actual data is inserted. - sql(s"ALTER TABLE $tableName ADD PARTITION (b=21, c=31)") - sql( - s""" - |INSERT OVERWRITE TABLE $tableName - |partition (b=21, c=31) IF NOT EXISTS - |SELECT 20, 24 - """.stripMargin) - checkAnswer(sql(selQuery), Row(5, 2, 3, 6)) - } - - test("Insert ArrayType.containsNull == false") { - val schema = StructType(Seq( - StructField("a", ArrayType(StringType, containsNull = false)))) - val rowRDD = spark.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i")))) - val df = spark.createDataFrame(rowRDD, schema) - df.createOrReplaceTempView("tableWithArrayValue") - sql("CREATE TABLE hiveTableWithArrayValue(a Array <STRING>)") - sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM tableWithArrayValue") - - checkAnswer( - sql("SELECT * FROM hiveTableWithArrayValue"), - rowRDD.collect().toSeq) - - sql("DROP TABLE hiveTableWithArrayValue") - } - - test("Insert MapType.valueContainsNull == false") { - val schema = StructType(Seq( - StructField("m", MapType(StringType, StringType, valueContainsNull = false)))) - val rowRDD = spark.sparkContext.parallelize( - (1 to 100).map(i => Row(Map(s"key$i" -> s"value$i")))) - val df = spark.createDataFrame(rowRDD, schema) - df.createOrReplaceTempView("tableWithMapValue") - sql("CREATE TABLE hiveTableWithMapValue(m Map <STRING, STRING>)") - sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue") - - checkAnswer( - sql("SELECT * FROM hiveTableWithMapValue"), - rowRDD.collect().toSeq) - - sql("DROP TABLE hiveTableWithMapValue") - } - - test("Insert StructType.fields.exists(_.nullable == false)") { - val schema = StructType(Seq( - StructField("s", StructType(Seq(StructField("f", StringType, nullable = false)))))) - val rowRDD = spark.sparkContext.parallelize( - (1 to 100).map(i => Row(Row(s"value$i")))) - val df = spark.createDataFrame(rowRDD, schema) - df.createOrReplaceTempView("tableWithStructValue") - sql("CREATE TABLE hiveTableWithStructValue(s Struct <f: STRING>)") - sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM tableWithStructValue") - - checkAnswer( - sql("SELECT * FROM hiveTableWithStructValue"), - rowRDD.collect().toSeq) - - sql("DROP TABLE hiveTableWithStructValue") - } - - test("Test partition mode = strict") { - withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) { - withTable("partitioned") { - sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") - val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")) - .toDF("id", "data", "part") - - intercept[SparkException] { - data.write.insertInto("partitioned") - } - } - } - } - - test("Detect table partitioning") { - withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { - withTable("source", "partitioned") { - sql("CREATE TABLE source (id bigint, data string, part string)") - val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")).toDF() - - data.write.insertInto("source") - checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq) - - sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") - // this will pick up the output partitioning from the table definition - spark.table("source").write.insertInto("partitioned") - - checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq) - } - } - } - - private def testPartitionedHiveSerDeTable(testName: String)(f: String => Unit): Unit = { - test(s"Hive SerDe table - $testName") { - val hiveTable = "hive_table" - - withTable(hiveTable) { - withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { - sql( - s""" - |CREATE TABLE $hiveTable (a INT, d INT) - |PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE - """.stripMargin) - f(hiveTable) - } - } - } - } - - private def testPartitionedDataSourceTable(testName: String)(f: String => Unit): Unit = { - test(s"Data source table - $testName") { - val dsTable = "ds_table" - - withTable(dsTable) { - sql( - s""" - |CREATE TABLE $dsTable (a INT, b INT, c INT, d INT) - |USING PARQUET PARTITIONED BY (b, c) - """.stripMargin) - f(dsTable) - } - } - } - - private def testPartitionedTable(testName: String)(f: String => Unit): Unit = { - testPartitionedHiveSerDeTable(testName)(f) - testPartitionedDataSourceTable(testName)(f) - } - - testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName => - val cause = intercept[AnalysisException] { - Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", "c").insertInto(tableName) - } - - assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy().")) - } - - testPartitionedTable( - "SPARK-16036: better error message when insert into a table with mismatch schema") { - tableName => - val e = intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3") - } - assert(e.message.contains( - "target table has 4 column(s) but the inserted data has 5 column(s)")) - } - - testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") { - tableName => - withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { - sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b") - checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4)) - sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 4, 2, 3") - checkAnswer(sql(s"SELECT a, b, c, 4 FROM $tableName"), Row(1, 2, 3, 4)) - } - } - - testPartitionedTable("INSERT INTO a partitioned table (semantic and error handling)") { - tableName => - withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { - sql(s"INSERT INTO TABLE $tableName PARTITION (b=2, c=3) SELECT 1, 4") - - sql(s"INSERT INTO TABLE $tableName PARTITION (b=6, c=7) SELECT 5, 8") - - sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 12") - - // c is defined twice. Analyzer will complain. - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, c=16) SELECT 13") - } - - // d is not a partitioning column. - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13, 14") - } - - // d is not a partitioning column. The total number of columns is correct. - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) SELECT 13") - } - - // The data is missing a column. - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13") - } - - // d is not a partitioning column. - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName PARTITION (b=15, d=15) SELECT 13, 14") - } - - // The statement is missing a column. - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14") - } - - // The statement is missing a column. - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14, 16") - } - - sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c) SELECT 13, 16, 15") - - // Dynamic partitioning columns need to be after static partitioning columns. - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName PARTITION (b, c=19) SELECT 17, 20, 18") - } - - sql(s"INSERT INTO TABLE $tableName PARTITION (b, c) SELECT 17, 20, 18, 19") - - sql(s"INSERT INTO TABLE $tableName PARTITION (c, b) SELECT 21, 24, 22, 23") - - sql(s"INSERT INTO TABLE $tableName SELECT 25, 28, 26, 27") - - checkAnswer( - sql(s"SELECT a, b, c, d FROM $tableName"), - Row(1, 2, 3, 4) :: - Row(5, 6, 7, 8) :: - Row(9, 10, 11, 12) :: - Row(13, 14, 15, 16) :: - Row(17, 18, 19, 20) :: - Row(21, 22, 23, 24) :: - Row(25, 26, 27, 28) :: Nil - ) - } - } - - testPartitionedTable("insertInto() should match columns by position and ignore column names") { - tableName => - withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { - // Columns `df.c` and `df.d` are resolved by position, and thus mapped to partition columns - // `b` and `c` of the target table. - val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d") - df.write.insertInto(tableName) - - checkAnswer( - sql(s"SELECT a, b, c, d FROM $tableName"), - Row(1, 3, 4, 2) - ) - } - } - - testPartitionedTable("insertInto() should match unnamed columns by position") { - tableName => - withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { - // Columns `c + 1` and `d + 1` are resolved by position, and thus mapped to partition - // columns `b` and `c` of the target table. - val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d") - df.select('a + 1, 'b + 1, 'c + 1, 'd + 1).write.insertInto(tableName) - - checkAnswer( - sql(s"SELECT a, b, c, d FROM $tableName"), - Row(2, 4, 5, 3) - ) - } - } - - testPartitionedTable("insertInto() should reject missing columns") { - tableName => - withTable("t") { - sql("CREATE TABLE t (a INT, b INT)") - - intercept[AnalysisException] { - spark.table("t").write.insertInto(tableName) - } - } - } - - testPartitionedTable("insertInto() should reject extra columns") { - tableName => - withTable("t") { - sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)") - - intercept[AnalysisException] { - spark.table("t").write.insertInto(tableName) - } - } - } - - private def testBucketedTable(testName: String)(f: String => Unit): Unit = { - test(s"Hive SerDe table - $testName") { - val hiveTable = "hive_table" - - withTable(hiveTable) { - withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { - sql( - s""" - |CREATE TABLE $hiveTable (a INT, d INT) - |PARTITIONED BY (b INT, c INT) - |CLUSTERED BY(a) - |SORTED BY(a, d) INTO 256 BUCKETS - |STORED AS TEXTFILE - """.stripMargin) - f(hiveTable) - } - } - } - } - - testBucketedTable("INSERT should NOT fail if strict bucketing is NOT enforced") { - tableName => - withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "false") { - sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b") - checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4)) - } - } - - testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") { - tableName => - withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") { - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") - } - } - withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") { - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") - } - } - withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") { - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") - } - } - } - - test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") { - // Set hive.exec.stagingdir under the table directory without start with ".". - withSQLConf("hive.exec.stagingdir" -> "./test") { - withTable("test_table") { - sql("CREATE TABLE test_table (key int)") - sql("INSERT OVERWRITE TABLE test_table SELECT 1") - checkAnswer(sql("SELECT * FROM test_table"), Row(1)) - } - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
