This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c9b85a2d6008 [SPARK-53063][CORE] Implement and call new APIs in 
FileCommitProtocol instead of the deprecated
c9b85a2d6008 is described below

commit c9b85a2d6008a56eb4bc7eee82a66129ba6fc53b
Author: Kent Yao <y...@apache.org>
AuthorDate: Mon Aug 4 13:54:13 2025 +0800

    [SPARK-53063][CORE] Implement and call new APIs in FileCommitProtocol 
instead of the deprecated
    
    ### What changes were proposed in this pull request?
    
    This PR implements and calls new APIs in FileCommitProtocol instead of the 
deprecated
    
    ### Why are the changes needed?
    
    FileCommitProtocol and related classes are complicated as they play a lot 
of tricks for tasks like file naming, config setting/propagation, e.t.c.  
Removing these references can improve the call stack a bit. And also, we can 
make these deprecated ones ignorable。
    
    ### Does this PR introduce _any_ user-facing change?
    No, nothing changes for existing implementations or end-users
    
    ### How was this patch tested?
    Pass existing CIs
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #51772 from yaooqinn/SPARK-53063.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../utils/src/main/scala/org/apache/spark/SparkException.scala |  8 ++++++++
 .../org/apache/spark/internal/io/FileCommitProtocol.scala      |  9 +++++++--
 .../spark/internal/io/HadoopMapReduceCommitProtocol.scala      | 10 ----------
 .../org/apache/spark/sql/errors/QueryCompilationErrors.scala   |  6 ++----
 .../spark/sql/execution/datasources/FileFormatDataWriter.scala |  2 +-
 .../streaming/runtime/ManifestFileCommitProtocol.scala         |  8 ++++----
 .../org/apache/spark/sql/sources/PartitionedWriteSuite.scala   |  3 ++-
 7 files changed, 24 insertions(+), 22 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala 
b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index 00989fd29095..7e217115ca11 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -131,6 +131,14 @@ object SparkException {
       messageParameters: java.util.Map[String, String]): Map[String, String] = 
{
     messageParameters.asScala.toMap
   }
+
+  def mustOverrideOneMethodError(methodName: String): RuntimeException = {
+    val msg = s"You must override one `$methodName`. It's preferred to not 
override the " +
+      "deprecated one."
+    new SparkRuntimeException(
+      "INTERNAL_ERROR",
+      Map("message" -> msg))
+  }
 }
 
 /**
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala 
b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index e2a96267082b..651895bf1f7a 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -20,6 +20,7 @@ package org.apache.spark.internal.io
 import org.apache.hadoop.fs._
 import org.apache.hadoop.mapreduce._
 
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.Unstable
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
@@ -96,7 +97,9 @@ abstract class FileCommitProtocol extends Logging {
    * guarantees that files written by different tasks will not conflict.
    */
   @deprecated("use newTaskTempFile(..., spec: FileNameSpec) instead", "3.3.0")
-  def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], 
ext: String): String
+  def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], 
ext: String): String = {
+    throw SparkException.mustOverrideOneMethodError("newTaskTempFile")
+  }
 
   /**
    * Notifies the commit protocol to add a new file, and gets back the full 
path that should be
@@ -135,7 +138,9 @@ abstract class FileCommitProtocol extends Logging {
    */
   @deprecated("use newTaskTempFileAbsPath(..., spec: FileNameSpec) instead", 
"3.3.0")
   def newTaskTempFileAbsPath(
-      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): 
String
+      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): 
String = {
+    throw SparkException.mustOverrideOneMethodError("newTaskTempFileAbsPath")
+  }
 
   /**
    * Similar to newTaskTempFile(), but allows files to committed to an 
absolute output location.
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 476cddc64395..79218dffff9e 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -117,11 +117,6 @@ class HadoopMapReduceCommitProtocol(
     format.getOutputCommitter(context)
   }
 
-  override def newTaskTempFile(
-      taskContext: TaskAttemptContext, dir: Option[String], ext: String): 
String = {
-    newTaskTempFile(taskContext, dir, FileNameSpec("", ext))
-  }
-
   override def newTaskTempFile(
       taskContext: TaskAttemptContext, dir: Option[String], spec: 
FileNameSpec): String = {
     val filename = getFilename(taskContext, spec)
@@ -145,11 +140,6 @@ class HadoopMapReduceCommitProtocol(
     }
   }
 
-  override def newTaskTempFileAbsPath(
-      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): 
String = {
-    newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext))
-  }
-
   override def newTaskTempFileAbsPath(
       taskContext: TaskAttemptContext, absoluteDir: String, spec: 
FileNameSpec): String = {
     val filename = getFilename(taskContext, spec)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 14f279ad5ad7..b296036c8fc0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -21,7 +21,7 @@ import java.util.Locale
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SPARK_DOC_ROOT, SparkException, 
SparkRuntimeException, SparkThrowable, SparkUnsupportedOperationException}
+import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, 
SparkUnsupportedOperationException}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, 
FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier}
 import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
FunctionAlreadyExistsException, NamespaceAlreadyExistsException, 
NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, 
NoSuchTableException, Star, TableAlreadyExistsException, UnresolvedRegex}
@@ -4203,9 +4203,7 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
   def mustOverrideOneMethodError(methodName: String): RuntimeException = {
     val msg = s"You must override one `$methodName`. It's preferred to not 
override the " +
       "deprecated one."
-    new SparkRuntimeException(
-      "INTERNAL_ERROR",
-      Map("message" -> msg))
+    SparkException.mustOverrideOneMethodError(msg)
   }
 
   def cannotAssignEventTimeColumn(): Throwable = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
index 7d071124b0b3..374b29e4c1a0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala
@@ -172,7 +172,7 @@ class SingleDirectoryDataWriter(
     val currentPath = committer.newTaskTempFile(
       taskAttemptContext,
       None,
-      f"-c$fileCounter%03d" + ext)
+      FileNameSpec("", f"-c$fileCounter%03d" + ext))
 
     currentWriter = description.outputWriterFactory.newInstance(
       path = currentPath,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala
index b382642eb6bf..6574dfd9b5bd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.{JobContext, 
TaskAttemptContext}
 
 import org.apache.spark.internal.{Logging, MDC}
 import org.apache.spark.internal.LogKeys.{BATCH_ID, PATH}
-import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.sql.errors.QueryExecutionErrors
 
@@ -114,13 +114,13 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
   }
 
   override def newTaskTempFile(
-      taskContext: TaskAttemptContext, dir: Option[String], ext: String): 
String = {
+      taskContext: TaskAttemptContext, dir: Option[String], spec: 
FileNameSpec): String = {
     // The file name looks like 
part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
     // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
     // the file name is fine and won't overflow.
     val split = taskContext.getTaskAttemptID.getTaskID.getId
     val uuid = UUID.randomUUID.toString
-    val filename = f"part-$split%05d-$uuid$ext"
+    val filename = f"part-$split%05d-$uuid${spec.suffix}"
 
     val file = dir.map { d =>
       new Path(new Path(path, d), filename).toString
@@ -133,7 +133,7 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
   }
 
   override def newTaskTempFileAbsPath(
-      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): 
String = {
+      taskContext: TaskAttemptContext, absoluteDir: String, spec: 
FileNameSpec): String = {
     throw 
QueryExecutionErrors.addFilesWithAbsolutePathUnsupportedError(this.toString)
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index 55bee7d4713d..e742fd68e915 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.{JobContext, 
TaskAttemptContext}
 
 import org.apache.spark.TestUtils
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileNameSpec
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -42,7 +43,7 @@ private class OnlyDetectCustomPathFileCommitProtocol(jobId: 
String, path: String
     with Serializable with Logging {
 
   override def newTaskTempFileAbsPath(
-      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): 
String = {
+      taskContext: TaskAttemptContext, absoluteDir: String, spec: 
FileNameSpec): String = {
     throw new Exception("there should be no custom partition path")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to