Repository: spark Updated Branches: refs/heads/master c76153cc7 -> 4e6fc6901
[SPARK-4131][FOLLOW-UP] Support "Writing data into the filesystem from queries" ## What changes were proposed in this pull request? This PR is clean the codes in https://github.com/apache/spark/pull/18975 ## How was this patch tested? N/A Author: gatorsmile <gatorsm...@gmail.com> Closes #19225 from gatorsmile/refactorSPARK-4131. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e6fc690 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e6fc690 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e6fc690 Branch: refs/heads/master Commit: 4e6fc69014af997e4fc41a6959f2d44f4b973bfa Parents: c76153c Author: gatorsmile <gatorsm...@gmail.com> Authored: Thu Sep 14 14:48:04 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Sep 14 14:48:04 2017 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 6 +++++- .../main/scala/org/apache/spark/sql/hive/HiveStrategies.scala | 3 +-- .../org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala | 6 ++++-- 3 files changed, 10 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4e6fc690/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index b06f4cc..162e1d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -801,7 +801,11 @@ object DDLUtils { val HIVE_PROVIDER = "hive" def isHiveTable(table: CatalogTable): Boolean = { - table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER + isHiveTable(table.provider) + } + + def isHiveTable(provider: Option[String]): Boolean = { + provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == HIVE_PROVIDER } def isDatasourceTable(table: CatalogTable): Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/4e6fc690/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index caf554d..805b317 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -160,8 +160,7 @@ object HiveAnalysis extends Rule[LogicalPlan] { CreateHiveTableAsSelectCommand(tableDesc, query, mode) case InsertIntoDir(isLocal, storage, provider, child, overwrite) - if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER => - + if DDLUtils.isHiveTable(provider) => val outputPath = new Path(storage.locationUri.get) if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath) http://git-wip-us.apache.org/repos/asf/spark/blob/4e6fc690/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index ad86994..2d74ef0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand @@ -50,7 +51,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { hadoopConf: Configuration, fileSinkConf: FileSinkDesc, outputLocation: String, - partitionAttributes: Seq[Attribute] = Nil): Unit = { + customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty, + partitionAttributes: Seq[Attribute] = Nil): Set[String] = { val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean if (isCompressed) { @@ -76,7 +78,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { plan = plan, fileFormat = new HiveFileFormat(fileSinkConf), committer = committer, - outputSpec = FileFormatWriter.OutputSpec(outputLocation, Map.empty), + outputSpec = FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations), hadoopConf = hadoopConf, partitionColumns = partitionAttributes, bucketSpec = None, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org