Repository: spark
Updated Branches:
  refs/heads/master c76153cc7 -> 4e6fc6901


[SPARK-4131][FOLLOW-UP] Support "Writing data into the filesystem from queries"

## What changes were proposed in this pull request?
This PR is clean the codes in https://github.com/apache/spark/pull/18975

## How was this patch tested?
N/A

Author: gatorsmile <gatorsm...@gmail.com>

Closes #19225 from gatorsmile/refactorSPARK-4131.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e6fc690
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e6fc690
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e6fc690

Branch: refs/heads/master
Commit: 4e6fc69014af997e4fc41a6959f2d44f4b973bfa
Parents: c76153c
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Thu Sep 14 14:48:04 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Sep 14 14:48:04 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/execution/command/ddl.scala     | 6 +++++-
 .../main/scala/org/apache/spark/sql/hive/HiveStrategies.scala  | 3 +--
 .../org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala   | 6 ++++--
 3 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4e6fc690/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index b06f4cc..162e1d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -801,7 +801,11 @@ object DDLUtils {
   val HIVE_PROVIDER = "hive"
 
   def isHiveTable(table: CatalogTable): Boolean = {
-    table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) == 
HIVE_PROVIDER
+    isHiveTable(table.provider)
+  }
+
+  def isHiveTable(provider: Option[String]): Boolean = {
+    provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == 
HIVE_PROVIDER
   }
 
   def isDatasourceTable(table: CatalogTable): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4e6fc690/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index caf554d..805b317 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -160,8 +160,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
       CreateHiveTableAsSelectCommand(tableDesc, query, mode)
 
     case InsertIntoDir(isLocal, storage, provider, child, overwrite)
-      if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) == 
DDLUtils.HIVE_PROVIDER =>
-
+        if DDLUtils.isHiveTable(provider) =>
       val outputPath = new Path(storage.locationUri.get)
       if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4e6fc690/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index ad86994..2d74ef0 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner
 
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.DataWritingCommand
@@ -50,7 +51,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand 
{
       hadoopConf: Configuration,
       fileSinkConf: FileSinkDesc,
       outputLocation: String,
-      partitionAttributes: Seq[Attribute] = Nil): Unit = {
+      customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
+      partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
 
     val isCompressed = hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
     if (isCompressed) {
@@ -76,7 +78,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand 
{
       plan = plan,
       fileFormat = new HiveFileFormat(fileSinkConf),
       committer = committer,
-      outputSpec = FileFormatWriter.OutputSpec(outputLocation, Map.empty),
+      outputSpec = FileFormatWriter.OutputSpec(outputLocation, 
customPartitionLocations),
       hadoopConf = hadoopConf,
       partitionColumns = partitionAttributes,
       bucketSpec = None,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to