This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c9b85a2d6008 [SPARK-53063][CORE] Implement and call new APIs in FileCommitProtocol instead of the deprecated c9b85a2d6008 is described below commit c9b85a2d6008a56eb4bc7eee82a66129ba6fc53b Author: Kent Yao <y...@apache.org> AuthorDate: Mon Aug 4 13:54:13 2025 +0800 [SPARK-53063][CORE] Implement and call new APIs in FileCommitProtocol instead of the deprecated ### What changes were proposed in this pull request? This PR implements and calls new APIs in FileCommitProtocol instead of the deprecated ### Why are the changes needed? FileCommitProtocol and related classes are complicated as they play a lot of tricks for tasks like file naming, config setting/propagation, e.t.c. Removing these references can improve the call stack a bit. And also, we can make these deprecated ones ignorable。 ### Does this PR introduce _any_ user-facing change? No, nothing changes for existing implementations or end-users ### How was this patch tested? Pass existing CIs ### Was this patch authored or co-authored using generative AI tooling? no Closes #51772 from yaooqinn/SPARK-53063. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../utils/src/main/scala/org/apache/spark/SparkException.scala | 8 ++++++++ .../org/apache/spark/internal/io/FileCommitProtocol.scala | 9 +++++++-- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 10 ---------- .../org/apache/spark/sql/errors/QueryCompilationErrors.scala | 6 ++---- .../spark/sql/execution/datasources/FileFormatDataWriter.scala | 2 +- .../streaming/runtime/ManifestFileCommitProtocol.scala | 8 ++++---- .../org/apache/spark/sql/sources/PartitionedWriteSuite.scala | 3 ++- 7 files changed, 24 insertions(+), 22 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala b/common/utils/src/main/scala/org/apache/spark/SparkException.scala index 00989fd29095..7e217115ca11 100644 --- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala +++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala @@ -131,6 +131,14 @@ object SparkException { messageParameters: java.util.Map[String, String]): Map[String, String] = { messageParameters.asScala.toMap } + + def mustOverrideOneMethodError(methodName: String): RuntimeException = { + val msg = s"You must override one `$methodName`. It's preferred to not override the " + + "deprecated one." + new SparkRuntimeException( + "INTERNAL_ERROR", + Map("message" -> msg)) + } } /** diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index e2a96267082b..651895bf1f7a 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -20,6 +20,7 @@ package org.apache.spark.internal.io import org.apache.hadoop.fs._ import org.apache.hadoop.mapreduce._ +import org.apache.spark.SparkException import org.apache.spark.annotation.Unstable import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -96,7 +97,9 @@ abstract class FileCommitProtocol extends Logging { * guarantees that files written by different tasks will not conflict. */ @deprecated("use newTaskTempFile(..., spec: FileNameSpec) instead", "3.3.0") - def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String + def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { + throw SparkException.mustOverrideOneMethodError("newTaskTempFile") + } /** * Notifies the commit protocol to add a new file, and gets back the full path that should be @@ -135,7 +138,9 @@ abstract class FileCommitProtocol extends Logging { */ @deprecated("use newTaskTempFileAbsPath(..., spec: FileNameSpec) instead", "3.3.0") def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String + taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { + throw SparkException.mustOverrideOneMethodError("newTaskTempFileAbsPath") + } /** * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 476cddc64395..79218dffff9e 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -117,11 +117,6 @@ class HadoopMapReduceCommitProtocol( format.getOutputCommitter(context) } - override def newTaskTempFile( - taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { - newTaskTempFile(taskContext, dir, FileNameSpec("", ext)) - } - override def newTaskTempFile( taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = { val filename = getFilename(taskContext, spec) @@ -145,11 +140,6 @@ class HadoopMapReduceCommitProtocol( } } - override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { - newTaskTempFileAbsPath(taskContext, absoluteDir, FileNameSpec("", ext)) - } - override def newTaskTempFileAbsPath( taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { val filename = getFilename(taskContext, spec) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 14f279ad5ad7..b296036c8fc0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -21,7 +21,7 @@ import java.util.Locale import org.apache.hadoop.fs.Path -import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkRuntimeException, SparkThrowable, SparkUnsupportedOperationException} +import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, SparkUnsupportedOperationException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, FunctionAlreadyExistsException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, Star, TableAlreadyExistsException, UnresolvedRegex} @@ -4203,9 +4203,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def mustOverrideOneMethodError(methodName: String): RuntimeException = { val msg = s"You must override one `$methodName`. It's preferred to not override the " + "deprecated one." - new SparkRuntimeException( - "INTERNAL_ERROR", - Map("message" -> msg)) + SparkException.mustOverrideOneMethodError(msg) } def cannotAssignEventTimeColumn(): Throwable = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 7d071124b0b3..374b29e4c1a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -172,7 +172,7 @@ class SingleDirectoryDataWriter( val currentPath = committer.newTaskTempFile( taskAttemptContext, None, - f"-c$fileCounter%03d" + ext) + FileNameSpec("", f"-c$fileCounter%03d" + ext)) currentWriter = description.outputWriterFactory.newInstance( path = currentPath, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala index b382642eb6bf..6574dfd9b5bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ManifestFileCommitProtocol.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.{BATCH_ID, PATH} -import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.sql.errors.QueryExecutionErrors @@ -114,13 +114,13 @@ class ManifestFileCommitProtocol(jobId: String, path: String) } override def newTaskTempFile( - taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { + taskContext: TaskAttemptContext, dir: Option[String], spec: FileNameSpec): String = { // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, // the file name is fine and won't overflow. val split = taskContext.getTaskAttemptID.getTaskID.getId val uuid = UUID.randomUUID.toString - val filename = f"part-$split%05d-$uuid$ext" + val filename = f"part-$split%05d-$uuid${spec.suffix}" val file = dir.map { d => new Path(new Path(path, d), filename).toString @@ -133,7 +133,7 @@ class ManifestFileCommitProtocol(jobId: String, path: String) } override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { + taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { throw QueryExecutionErrors.addFilesWithAbsolutePathUnsupportedError(this.toString) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 55bee7d4713d..e742fd68e915 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.TestUtils import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileNameSpec import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -42,7 +43,7 @@ private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String with Serializable with Logging { override def newTaskTempFileAbsPath( - taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { + taskContext: TaskAttemptContext, absoluteDir: String, spec: FileNameSpec): String = { throw new Exception("there should be no custom partition path") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org