This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 45f22b5f49 [CORE] Remove the class-overriding of 
InsertIntoHadoopFsRelationCommand (#10187)
45f22b5f49 is described below

commit 45f22b5f49db83d11b9046fd8600197ddc67e24e
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Jul 15 16:35:13 2025 +0800

    [CORE] Remove the class-overriding of InsertIntoHadoopFsRelationCommand 
(#10187)
---
 .../delta/ClickhouseOptimisticTransaction.scala    |   4 +-
 .../delta/ClickhouseOptimisticTransaction.scala    |   4 +-
 .../delta/ClickhouseOptimisticTransaction.scala    |   4 +-
 .../datasources/GlutenWriterColumnarRules.scala    |  19 +-
 package/pom.xml                                    |   2 -
 .../execution/datasources/FileFormatWriter.scala   |  33 +--
 .../InsertIntoHadoopFsRelationCommand.scala        | 288 --------------------
 .../execution/datasources/FileFormatWriter.scala   |  33 +--
 .../InsertIntoHadoopFsRelationCommand.scala        | 291 ---------------------
 9 files changed, 33 insertions(+), 645 deletions(-)

diff --git 
a/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index cd55e3ae36..588d73fe81 100644
--- 
a/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src-delta20/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -119,7 +119,7 @@ class ClickhouseOptimisticTransaction(
 
         try {
           val format = tableV2.getFileFormat(metadata)
-          GlutenWriterColumnarRules.injectSparkLocalProperty(spark, 
Some(format.shortName()))
+          GlutenWriterColumnarRules.injectSparkLocalProperty(spark, 
Some(format.shortName()), None)
           FileFormatWriter.write(
             sparkSession = spark,
             plan = newQueryPlan,
@@ -147,7 +147,7 @@ class ClickhouseOptimisticTransaction(
               throw s
             }
         } finally {
-          GlutenWriterColumnarRules.injectSparkLocalProperty(spark, None)
+          GlutenWriterColumnarRules.injectSparkLocalProperty(spark, None, None)
         }
       }
       committer.addedStatuses.toSeq ++ committer.changeFiles
diff --git 
a/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index a1a51846e9..2095a1e7e5 100644
--- 
a/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src-delta23/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -120,7 +120,7 @@ class ClickhouseOptimisticTransaction(
 
         try {
           val format = tableV2.getFileFormat(metadata)
-          GlutenWriterColumnarRules.injectSparkLocalProperty(spark, 
Some(format.shortName()))
+          GlutenWriterColumnarRules.injectSparkLocalProperty(spark, 
Some(format.shortName()), None)
           FileFormatWriter.write(
             sparkSession = spark,
             plan = newQueryPlan,
@@ -148,7 +148,7 @@ class ClickhouseOptimisticTransaction(
               throw s
             }
         } finally {
-          GlutenWriterColumnarRules.injectSparkLocalProperty(spark, None)
+          GlutenWriterColumnarRules.injectSparkLocalProperty(spark, None, None)
         }
       }
       committer.addedStatuses.toSeq ++ committer.changeFiles
diff --git 
a/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 611d011ee7..240c0d7b8b 100644
--- 
a/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src-delta33/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -164,7 +164,7 @@ class ClickhouseOptimisticTransaction(
 
       try {
         val format = tableV2.getFileFormat(protocol, metadata)
-        GlutenWriterColumnarRules.injectSparkLocalProperty(spark, 
Some(format.shortName()))
+        GlutenWriterColumnarRules.injectSparkLocalProperty(spark, 
Some(format.shortName()), None)
         MergeTreeFileFormatWriter.write(
           sparkSession = spark,
           plan = newQueryPlan,
@@ -193,7 +193,7 @@ class ClickhouseOptimisticTransaction(
             throw s
           }
       } finally {
-        GlutenWriterColumnarRules.injectSparkLocalProperty(spark, None)
+        GlutenWriterColumnarRules.injectSparkLocalProperty(spark, None, None)
       }
     }
     committer.addedStatuses.toSeq ++ committer.changeFiles
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 55c4438fda..59360712a0 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -111,7 +111,13 @@ object GlutenWriterColumnarRules {
           } else {
             None
           }
-        injectSparkLocalProperty(session, format)
+        val numStaticPartitions: Option[Int] = cmd match {
+          case cmd: InsertIntoHadoopFsRelationCommand =>
+            Some(cmd.staticPartitions.size)
+          case _ =>
+            None
+        }
+        injectSparkLocalProperty(session, format, numStaticPartitions)
         format match {
           case Some(_) =>
             injectFakeRowAdaptor(rc, child)
@@ -123,7 +129,12 @@ object GlutenWriterColumnarRules {
     }
   }
 
-  def injectSparkLocalProperty(spark: SparkSession, format: Option[String]): 
Unit = {
+  // TODO: This makes FileFormatWriter#write caller-sensitive.
+  //  Remove this workaround once we have a better solution.
+  def injectSparkLocalProperty(
+      spark: SparkSession,
+      format: Option[String],
+      numStaticPartitions: Option[Int]): Unit = {
     if (format.isDefined) {
       spark.sparkContext.setLocalProperty("isNativeApplicable", true.toString)
       spark.sparkContext.setLocalProperty("nativeFormat", format.get)
@@ -132,10 +143,14 @@ object GlutenWriterColumnarRules {
       spark.sparkContext.setLocalProperty(
         "staticPartitionWriteOnly",
         BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
+      spark.sparkContext.setLocalProperty(
+        "numStaticPartitionCols",
+        numStaticPartitions.getOrElse(0).toString)
     } else {
       spark.sparkContext.setLocalProperty("isNativeApplicable", null)
       spark.sparkContext.setLocalProperty("nativeFormat", null)
       spark.sparkContext.setLocalProperty("staticPartitionWriteOnly", null)
+      spark.sparkContext.setLocalProperty("numStaticPartitionCols", null)
     }
   }
 }
diff --git a/package/pom.xml b/package/pom.xml
index 90b8892a48..723f8d8a8a 100644
--- a/package/pom.xml
+++ b/package/pom.xml
@@ -320,8 +320,6 @@
                     
<ignoreClass>org.apache.spark.sql.execution.datasources.orc.OrcFileFormat*</ignoreClass>
                     
<ignoreClass>org.apache.spark.sql.execution.datasources.FileFormatWriter$OutputSpec$</ignoreClass>
                     
<ignoreClass>org.apache.spark.sql.execution.datasources.FileFormatWriter$</ignoreClass>
-                    
<ignoreClass>org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand</ignoreClass>
-                    
<ignoreClass>org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$</ignoreClass>
                     
<ignoreClass>org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter</ignoreClass>
                     
<ignoreClass>org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$</ignoreClass>
                     
<ignoreClass>org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator</ignoreClass>
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 1c7ab83209..4bdb786caa 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -107,32 +107,6 @@ object FileFormatWriter extends Logging {
    * @return
    *   The set of all partition paths that were updated during this write job.
    */
-
-  // scalastyle:off argcount
-  def write(
-      sparkSession: SparkSession,
-      plan: SparkPlan,
-      fileFormat: FileFormat,
-      committer: FileCommitProtocol,
-      outputSpec: OutputSpec,
-      hadoopConf: Configuration,
-      partitionColumns: Seq[Attribute],
-      bucketSpec: Option[BucketSpec],
-      statsTrackers: Seq[WriteJobStatsTracker],
-      options: Map[String, String]): Set[String] = write(
-    sparkSession = sparkSession,
-    plan = plan,
-    fileFormat = fileFormat,
-    committer = committer,
-    outputSpec = outputSpec,
-    hadoopConf = hadoopConf,
-    partitionColumns = partitionColumns,
-    bucketSpec = bucketSpec,
-    statsTrackers = statsTrackers,
-    options = options,
-    numStaticPartitionCols = 0
-  )
-
   def write(
       sparkSession: SparkSession,
       plan: SparkPlan,
@@ -143,11 +117,14 @@ object FileFormatWriter extends Logging {
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
       statsTrackers: Seq[WriteJobStatsTracker],
-      options: Map[String, String],
-      numStaticPartitionCols: Int = 0): Set[String] = {
+      options: Map[String, String]): Set[String] = {
 
     val nativeEnabled =
       "true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
+    val numStaticPartitionCols =
+      
Option(sparkSession.sparkContext.getLocalProperty("numStaticPartitionCols"))
+        .map(_.toInt)
+        .getOrElse(0)
 
     if (nativeEnabled) {
       logInfo(
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
deleted file mode 100644
index 9221ecbd12..0000000000
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.spark.internal.io.FileCommitProtocol
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, 
CatalogTablePartition}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
-import org.apache.spark.sql.util.SchemaUtils
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-
-/**
- * A command for writing data to a [[HadoopFsRelation]]. Supports both 
overwriting and appending.
- * Writing to dynamic partitions is also supported.
- *
- * @param staticPartitions
- *   partial partitioning spec for write. This defines the scope of partition 
overwrites: when the
- *   spec is empty, all partitions are overwritten. When it covers a prefix of 
the partition keys,
- *   only partitions matching the prefix are overwritten.
- * @param ifPartitionNotExists
- *   If true, only write if the partition does not exist. Only valid for 
static partitions.
- */
-case class InsertIntoHadoopFsRelationCommand(
-    outputPath: Path,
-    staticPartitions: TablePartitionSpec,
-    ifPartitionNotExists: Boolean,
-    partitionColumns: Seq[Attribute],
-    bucketSpec: Option[BucketSpec],
-    fileFormat: FileFormat,
-    options: Map[String, String],
-    query: LogicalPlan,
-    mode: SaveMode,
-    catalogTable: Option[CatalogTable],
-    fileIndex: Option[FileIndex],
-    outputColumnNames: Seq[String])
-  extends DataWritingCommand {
-
-  private lazy val parameters = CaseInsensitiveMap(options)
-
-  private[sql] lazy val dynamicPartitionOverwrite: Boolean = {
-    val partitionOverwriteMode = parameters
-      .get("partitionOverwriteMode")
-      // scalastyle:off caselocale
-      .map(mode => PartitionOverwriteMode.withName(mode.toUpperCase))
-      // scalastyle:on caselocale
-      .getOrElse(conf.partitionOverwriteMode)
-    val enableDynamicOverwrite = partitionOverwriteMode == 
PartitionOverwriteMode.DYNAMIC
-    // This config only makes sense when we are overwriting a partitioned 
dataset with dynamic
-    // partition columns.
-    enableDynamicOverwrite && mode == SaveMode.Overwrite &&
-    staticPartitions.size < partitionColumns.length
-  }
-
-  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
-    // Most formats don't do well with duplicate columns, so lets not allow 
that
-    SchemaUtils.checkColumnNameDuplication(
-      outputColumnNames,
-      s"when inserting into $outputPath",
-      sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-    val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(options)
-    val fs = outputPath.getFileSystem(hadoopConf)
-    val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-
-    val partitionsTrackedByCatalog = 
sparkSession.sessionState.conf.manageFilesourcePartitions &&
-      catalogTable.isDefined &&
-      catalogTable.get.partitionColumnNames.nonEmpty &&
-      catalogTable.get.tracksPartitionsInCatalog
-
-    var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
-    var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
-    var matchingPartitions: Seq[CatalogTablePartition] = Seq.empty
-
-    // When partitions are tracked by the catalog, compute all custom 
partition locations that
-    // may be relevant to the insertion job.
-    if (partitionsTrackedByCatalog) {
-      matchingPartitions = sparkSession.sessionState.catalog
-        .listPartitions(catalogTable.get.identifier, Some(staticPartitions))
-      initialMatchingPartitions = matchingPartitions.map(_.spec)
-      customPartitionLocations =
-        getCustomPartitionLocations(fs, catalogTable.get, qualifiedOutputPath, 
matchingPartitions)
-    }
-
-    val jobId = java.util.UUID.randomUUID().toString
-    val committer = FileCommitProtocol.instantiate(
-      sparkSession.sessionState.conf.fileCommitProtocolClass,
-      jobId = jobId,
-      outputPath = outputPath.toString,
-      dynamicPartitionOverwrite = dynamicPartitionOverwrite)
-
-    val doInsertion = if (mode == SaveMode.Append) {
-      true
-    } else {
-      val pathExists = fs.exists(qualifiedOutputPath)
-      (mode, pathExists) match {
-        case (SaveMode.ErrorIfExists, true) =>
-          throw 
QueryCompilationErrors.outputPathAlreadyExistsError(qualifiedOutputPath)
-        case (SaveMode.Overwrite, true) =>
-          if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
-            false
-          } else if (dynamicPartitionOverwrite) {
-            // For dynamic partition overwrite, do not delete partition 
directories ahead.
-            true
-          } else {
-            deleteMatchingPartitions(fs, qualifiedOutputPath, 
customPartitionLocations, committer)
-            true
-          }
-        case (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
-          true
-        case (SaveMode.Ignore, exists) =>
-          !exists
-        case (s, exists) =>
-          throw QueryExecutionErrors.unsupportedSaveModeError(s.toString, 
exists)
-      }
-    }
-
-    if (doInsertion) {
-
-      def refreshUpdatedPartitions(updatedPartitionPaths: Set[String]): Unit = 
{
-        val updatedPartitions = 
updatedPartitionPaths.map(PartitioningUtils.parsePathFragment)
-        if (partitionsTrackedByCatalog) {
-          val newPartitions = updatedPartitions -- initialMatchingPartitions
-          if (newPartitions.nonEmpty) {
-            AlterTableAddPartitionCommand(
-              catalogTable.get.identifier,
-              newPartitions.toSeq.map(p => (p, None)),
-              ifNotExists = true).run(sparkSession)
-          }
-          // For dynamic partition overwrite, we never remove partitions but 
only update existing
-          // ones.
-          if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) {
-            val deletedPartitions = initialMatchingPartitions.toSet -- 
updatedPartitions
-            if (deletedPartitions.nonEmpty) {
-              AlterTableDropPartitionCommand(
-                catalogTable.get.identifier,
-                deletedPartitions.toSeq,
-                ifExists = true,
-                purge = false,
-                retainData = true /* already deleted */ ).run(sparkSession)
-            }
-          }
-        }
-      }
-
-      // For dynamic partition overwrite, FileOutputCommitter's output path is 
staging path, files
-      // will be renamed from staging path to final output path during commit 
job
-      val committerOutputPath = if (dynamicPartitionOverwrite) {
-        FileCommitProtocol
-          .getStagingDir(outputPath.toString, jobId)
-          .makeQualified(fs.getUri, fs.getWorkingDirectory)
-      } else {
-        qualifiedOutputPath
-      }
-
-      val updatedPartitionPaths =
-        FileFormatWriter.write(
-          sparkSession = sparkSession,
-          plan = child,
-          fileFormat = fileFormat,
-          committer = committer,
-          outputSpec = FileFormatWriter.OutputSpec(
-            committerOutputPath.toString,
-            customPartitionLocations,
-            outputColumns),
-          hadoopConf = hadoopConf,
-          partitionColumns = partitionColumns,
-          bucketSpec = bucketSpec,
-          statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
-          options = options,
-          numStaticPartitionCols = staticPartitions.size
-        )
-
-      // update metastore partition metadata
-      if (
-        updatedPartitionPaths.isEmpty && staticPartitions.nonEmpty
-        && partitionColumns.length == staticPartitions.size
-      ) {
-        // Avoid empty static partition can't loaded to datasource table.
-        val staticPathFragment =
-          PartitioningUtils.getPathFragment(staticPartitions, partitionColumns)
-        refreshUpdatedPartitions(Set(staticPathFragment))
-      } else {
-        refreshUpdatedPartitions(updatedPartitionPaths)
-      }
-
-      // refresh cached files in FileIndex
-      fileIndex.foreach(_.refresh())
-      // refresh data cache if table is cached
-      sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, 
outputPath, fs)
-
-      if (catalogTable.nonEmpty) {
-        CommandUtils.updateTableStats(sparkSession, catalogTable.get)
-      }
-
-    } else {
-      logInfo("Skipping insertion into a relation that already exists.")
-    }
-
-    Seq.empty[Row]
-  }
-
-  /**
-   * Deletes all partition files that match the specified static prefix. 
Partitions with custom
-   * locations are also cleared based on the custom locations map given to 
this class.
-   */
-  private def deleteMatchingPartitions(
-      fs: FileSystem,
-      qualifiedOutputPath: Path,
-      customPartitionLocations: Map[TablePartitionSpec, String],
-      committer: FileCommitProtocol): Unit = {
-    val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
-      "/" + partitionColumns
-        .flatMap(p => 
staticPartitions.get(p.name).map(getPartitionPathString(p.name, _)))
-        .mkString("/")
-    } else {
-      ""
-    }
-    // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
-    val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
-    if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
-      throw 
QueryExecutionErrors.cannotClearOutputDirectoryError(staticPrefixPath)
-    }
-    // now clear all custom partition locations (e.g. 
/custom/dir/where/foo=2/bar=4)
-    for ((spec, customLoc) <- customPartitionLocations) {
-      assert(
-        (staticPartitions.toSet -- spec).isEmpty,
-        "Custom partition location did not match static partitioning keys")
-      val path = new Path(customLoc)
-      if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
-        throw QueryExecutionErrors.cannotClearPartitionDirectoryError(path)
-      }
-    }
-  }
-
-  /**
-   * Given a set of input partitions, returns those that have locations that 
differ from the Hive
-   * default (e.g. /k1=v1/k2=v2). These partitions were manually assigned 
locations by the user.
-   *
-   * @return
-   *   a mapping from partition specs to their custom locations
-   */
-  private def getCustomPartitionLocations(
-      fs: FileSystem,
-      table: CatalogTable,
-      qualifiedOutputPath: Path,
-      partitions: Seq[CatalogTablePartition]): Map[TablePartitionSpec, String] 
= {
-    partitions.flatMap {
-      p =>
-        val defaultLocation = qualifiedOutputPath
-          .suffix("/" + PartitioningUtils.getPathFragment(p.spec, 
table.partitionSchema))
-          .toString
-        val catalogLocation =
-          new Path(p.location).makeQualified(fs.getUri, 
fs.getWorkingDirectory).toString
-        if (catalogLocation != defaultLocation) {
-          Some(p.spec -> catalogLocation)
-        } else {
-          None
-        }
-    }.toMap
-  }
-
-  override protected def withNewChildInternal(
-      newChild: LogicalPlan): InsertIntoHadoopFsRelationCommand = copy(query = 
newChild)
-}
-// scalastyle:on line.size.limit
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index c037305f2e..3e2eb6c4df 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -99,32 +99,6 @@ object FileFormatWriter extends Logging {
    * @return
    *   The set of all partition paths that were updated during this write job.
    */
-
-  // scalastyle:off argcount
-  def write(
-      sparkSession: SparkSession,
-      plan: SparkPlan,
-      fileFormat: FileFormat,
-      committer: FileCommitProtocol,
-      outputSpec: OutputSpec,
-      hadoopConf: Configuration,
-      partitionColumns: Seq[Attribute],
-      bucketSpec: Option[BucketSpec],
-      statsTrackers: Seq[WriteJobStatsTracker],
-      options: Map[String, String]): Set[String] = write(
-    sparkSession = sparkSession,
-    plan = plan,
-    fileFormat = fileFormat,
-    committer = committer,
-    outputSpec = outputSpec,
-    hadoopConf = hadoopConf,
-    partitionColumns = partitionColumns,
-    bucketSpec = bucketSpec,
-    statsTrackers = statsTrackers,
-    options = options,
-    numStaticPartitionCols = 0
-  )
-
   def write(
       sparkSession: SparkSession,
       plan: SparkPlan,
@@ -135,11 +109,14 @@ object FileFormatWriter extends Logging {
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
       statsTrackers: Seq[WriteJobStatsTracker],
-      options: Map[String, String],
-      numStaticPartitionCols: Int = 0): Set[String] = {
+      options: Map[String, String]): Set[String] = {
 
     val nativeEnabled =
       "true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
+    val numStaticPartitionCols =
+      
Option(sparkSession.sparkContext.getLocalProperty("numStaticPartitionCols"))
+        .map(_.toInt)
+        .getOrElse(0)
 
     if (nativeEnabled) {
       logInfo(
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
deleted file mode 100644
index b1e740284b..0000000000
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.spark.internal.io.FileCommitProtocol
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, 
CatalogTablePartition}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
-import org.apache.spark.sql.util.SchemaUtils
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-
-/**
- * A command for writing data to a [[HadoopFsRelation]]. Supports both 
overwriting and appending.
- * Writing to dynamic partitions is also supported.
- *
- * @param staticPartitions
- *   partial partitioning spec for write. This defines the scope of partition 
overwrites: when the
- *   spec is empty, all partitions are overwritten. When it covers a prefix of 
the partition keys,
- *   only partitions matching the prefix are overwritten.
- * @param ifPartitionNotExists
- *   If true, only write if the partition does not exist. Only valid for 
static partitions.
- */
-
-// scalastyle:off line.size.limit
-case class InsertIntoHadoopFsRelationCommand(
-    outputPath: Path,
-    staticPartitions: TablePartitionSpec,
-    ifPartitionNotExists: Boolean,
-    partitionColumns: Seq[Attribute],
-    bucketSpec: Option[BucketSpec],
-    fileFormat: FileFormat,
-    options: Map[String, String],
-    query: LogicalPlan,
-    mode: SaveMode,
-    catalogTable: Option[CatalogTable],
-    fileIndex: Option[FileIndex],
-    outputColumnNames: Seq[String])
-  extends DataWritingCommand {
-
-  private lazy val parameters = CaseInsensitiveMap(options)
-
-  private[sql] lazy val dynamicPartitionOverwrite: Boolean = {
-    val partitionOverwriteMode = parameters
-      .get(DataSourceUtils.PARTITION_OVERWRITE_MODE)
-      // scalastyle:off caselocale
-      .map(mode => PartitionOverwriteMode.withName(mode.toUpperCase))
-      // scalastyle:on caselocale
-      .getOrElse(conf.partitionOverwriteMode)
-    val enableDynamicOverwrite = partitionOverwriteMode == 
PartitionOverwriteMode.DYNAMIC
-    // This config only makes sense when we are overwriting a partitioned 
dataset with dynamic
-    // partition columns.
-    enableDynamicOverwrite && mode == SaveMode.Overwrite &&
-    staticPartitions.size < partitionColumns.length
-  }
-
-  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
-    // Most formats don't do well with duplicate columns, so lets not allow 
that
-    SchemaUtils.checkColumnNameDuplication(
-      outputColumnNames,
-      s"when inserting into $outputPath",
-      sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-    val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(options)
-    val fs = outputPath.getFileSystem(hadoopConf)
-    val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-
-    val partitionsTrackedByCatalog = 
sparkSession.sessionState.conf.manageFilesourcePartitions &&
-      catalogTable.isDefined &&
-      catalogTable.get.partitionColumnNames.nonEmpty &&
-      catalogTable.get.tracksPartitionsInCatalog
-
-    var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
-    var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
-    var matchingPartitions: Seq[CatalogTablePartition] = Seq.empty
-
-    // When partitions are tracked by the catalog, compute all custom 
partition locations that
-    // may be relevant to the insertion job.
-    if (partitionsTrackedByCatalog) {
-      matchingPartitions = sparkSession.sessionState.catalog
-        .listPartitions(catalogTable.get.identifier, Some(staticPartitions))
-      initialMatchingPartitions = matchingPartitions.map(_.spec)
-      customPartitionLocations =
-        getCustomPartitionLocations(fs, catalogTable.get, qualifiedOutputPath, 
matchingPartitions)
-    }
-
-    val jobId = java.util.UUID.randomUUID().toString
-    val committer = FileCommitProtocol.instantiate(
-      sparkSession.sessionState.conf.fileCommitProtocolClass,
-      jobId = jobId,
-      outputPath = outputPath.toString,
-      dynamicPartitionOverwrite = dynamicPartitionOverwrite)
-
-    val doInsertion = if (mode == SaveMode.Append) {
-      true
-    } else {
-      val pathExists = fs.exists(qualifiedOutputPath)
-      (mode, pathExists) match {
-        case (SaveMode.ErrorIfExists, true) =>
-          throw 
QueryCompilationErrors.outputPathAlreadyExistsError(qualifiedOutputPath)
-        case (SaveMode.Overwrite, true) =>
-          if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
-            false
-          } else if (dynamicPartitionOverwrite) {
-            // For dynamic partition overwrite, do not delete partition 
directories ahead.
-            true
-          } else {
-            deleteMatchingPartitions(fs, qualifiedOutputPath, 
customPartitionLocations, committer)
-            true
-          }
-        case (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
-          true
-        case (SaveMode.Ignore, exists) =>
-          !exists
-        case (s, exists) =>
-          throw QueryExecutionErrors.saveModeUnsupportedError(s, exists)
-      }
-    }
-
-    if (doInsertion) {
-
-      def refreshUpdatedPartitions(updatedPartitionPaths: Set[String]): Unit = 
{
-        val updatedPartitions = 
updatedPartitionPaths.map(PartitioningUtils.parsePathFragment)
-        if (partitionsTrackedByCatalog) {
-          val newPartitions = updatedPartitions -- initialMatchingPartitions
-          if (newPartitions.nonEmpty) {
-            AlterTableAddPartitionCommand(
-              catalogTable.get.identifier,
-              newPartitions.toSeq.map(p => (p, None)),
-              ifNotExists = true).run(sparkSession)
-          }
-          // For dynamic partition overwrite, we never remove partitions but 
only update existing
-          // ones.
-          if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) {
-            val deletedPartitions = initialMatchingPartitions.toSet -- 
updatedPartitions
-            if (deletedPartitions.nonEmpty) {
-              AlterTableDropPartitionCommand(
-                catalogTable.get.identifier,
-                deletedPartitions.toSeq,
-                ifExists = true,
-                purge = false,
-                retainData = true /* already deleted */ ).run(sparkSession)
-            }
-          }
-        }
-      }
-
-      // For dynamic partition overwrite, FileOutputCommitter's output path is 
staging path, files
-      // will be renamed from staging path to final output path during commit 
job
-      val committerOutputPath = if (dynamicPartitionOverwrite) {
-        FileCommitProtocol
-          .getStagingDir(outputPath.toString, jobId)
-          .makeQualified(fs.getUri, fs.getWorkingDirectory)
-      } else {
-        qualifiedOutputPath
-      }
-
-      val updatedPartitionPaths =
-        FileFormatWriter.write(
-          sparkSession = sparkSession,
-          plan = child,
-          fileFormat = fileFormat,
-          committer = committer,
-          outputSpec = FileFormatWriter.OutputSpec(
-            committerOutputPath.toString,
-            customPartitionLocations,
-            outputColumns),
-          hadoopConf = hadoopConf,
-          partitionColumns = partitionColumns,
-          bucketSpec = bucketSpec,
-          statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
-          options = options,
-          numStaticPartitionCols = staticPartitions.size
-        )
-
-      // update metastore partition metadata
-      if (
-        updatedPartitionPaths.isEmpty && staticPartitions.nonEmpty
-        && partitionColumns.length == staticPartitions.size
-      ) {
-        // Avoid empty static partition can't loaded to datasource table.
-        val staticPathFragment =
-          PartitioningUtils.getPathFragment(staticPartitions, partitionColumns)
-        refreshUpdatedPartitions(Set(staticPathFragment))
-      } else {
-        refreshUpdatedPartitions(updatedPartitionPaths)
-      }
-
-      // refresh cached files in FileIndex
-      fileIndex.foreach(_.refresh())
-      // refresh data cache if table is cached
-      sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, 
outputPath, fs)
-
-      if (catalogTable.nonEmpty) {
-        CommandUtils.updateTableStats(sparkSession, catalogTable.get)
-      }
-
-    } else {
-      logInfo("Skipping insertion into a relation that already exists.")
-    }
-
-    Seq.empty[Row]
-  }
-
-  /**
-   * Deletes all partition files that match the specified static prefix. 
Partitions with custom
-   * locations are also cleared based on the custom locations map given to 
this class.
-   */
-  private def deleteMatchingPartitions(
-      fs: FileSystem,
-      qualifiedOutputPath: Path,
-      customPartitionLocations: Map[TablePartitionSpec, String],
-      committer: FileCommitProtocol): Unit = {
-    val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
-      "/" + partitionColumns
-        .flatMap(p => 
staticPartitions.get(p.name).map(getPartitionPathString(p.name, _)))
-        .mkString("/")
-    } else {
-      ""
-    }
-    // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
-    val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
-    if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
-      throw 
QueryExecutionErrors.cannotClearOutputDirectoryError(staticPrefixPath)
-    }
-    // now clear all custom partition locations (e.g. 
/custom/dir/where/foo=2/bar=4)
-    for ((spec, customLoc) <- customPartitionLocations) {
-      assert(
-        (staticPartitions.toSet -- spec).isEmpty,
-        "Custom partition location did not match static partitioning keys")
-      val path = new Path(customLoc)
-      if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
-        throw QueryExecutionErrors.cannotClearPartitionDirectoryError(path)
-      }
-    }
-  }
-
-  /**
-   * Given a set of input partitions, returns those that have locations that 
differ from the Hive
-   * default (e.g. /k1=v1/k2=v2). These partitions were manually assigned 
locations by the user.
-   *
-   * @return
-   *   a mapping from partition specs to their custom locations
-   */
-  private def getCustomPartitionLocations(
-      fs: FileSystem,
-      table: CatalogTable,
-      qualifiedOutputPath: Path,
-      partitions: Seq[CatalogTablePartition]): Map[TablePartitionSpec, String] 
= {
-    partitions.flatMap {
-      p =>
-        val defaultLocation = qualifiedOutputPath
-          .suffix("/" + PartitioningUtils.getPathFragment(p.spec, 
table.partitionSchema))
-          .toString
-        val catalogLocation =
-          new Path(p.location).makeQualified(fs.getUri, 
fs.getWorkingDirectory).toString
-        if (catalogLocation != defaultLocation) {
-          Some(p.spec -> catalogLocation)
-        } else {
-          None
-        }
-    }.toMap
-  }
-
-  override protected def withNewChildInternal(
-      newChild: LogicalPlan): InsertIntoHadoopFsRelationCommand = copy(query = 
newChild)
-}
-
-// scalastyle:on line.size.limit


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to