[
https://issues.apache.org/jira/browse/SPARK-33298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cheng Su updated SPARK-33298:
-----------------------------
Parent: SPARK-19256
Issue Type: Sub-task (was: New Feature)
> FileCommitProtocol V2
> ---------------------
>
> Key: SPARK-33298
> URL: https://issues.apache.org/jira/browse/SPARK-33298
> Project: Spark
> Issue Type: Sub-task
> Components: Spark Core
> Affects Versions: 3.1.0
> Reporter: Cheng Su
> Priority: Minor
>
> This Jira is to propose a new version for `FileCommitProtocol`
> ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala]
> ), e.g. `FileCommitProtocolV2`.
> The motivation is currently we have two requirements to change the API for
> FileCommitProtocol:
> (1).Support write Hive ORC/Parquet bucketed table
> ([https://github.com/apache/spark/pull/30003] ): need to add new parameter
> `prefix` into method `newTaskTempFile` and `newTaskTempFileAbsPath`, to allow
> spark writes hive/presto-compatible bucketed files.
> (2).Fix commit collision in dynamic partition overwrite mode
> ([https://github.com/apache/spark/pull/29000] ): need to add new method
> `getStagingDir` to allow customize dynamic partition staging directory to
> avoid commit collision.
>
> The reason to propose FileCommitProtocolV2 instead of changing
> `FileCommitProtocol` directly, is that the API for FileCommitProtocolV2 is
> kind of public where we allow customized commit protocol subclass to use
> during run-time
> ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala#L146]
> ). So if we change the API (e.g. adding method, or changing existing method
> signature), it will break external subclass for the commit protocol. And we
> are aware of some of external subclasses for better support of object store,
> according to [~cloud_fan] .
>
> One proposal for `FileCommitProtocolV2` can be:
> {code:java}
> abstract class FileCommitProtocolV2 {
> // `options` to replace `ext`, where we can put more string-string
> parameters
> def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String],
> options: Map[String, String]): String
>
> // `options` to replace `ext`, where we can put more string-string
> parameters
> def newTaskTempFileAbsPath(
> taskContext: TaskAttemptContext, absoluteDir: String, options:
> Map[String, String]): String
> // other new methods, e.g. getStagingDir
> def getStagingDir(path: String, jobId: String): Path
> // rest of FileCommitProtocol methods
> ...
> }
> {code}
>
> FileCommitProtocolV2.instantiate() logic will first try to find a subclass
> for `FileCommitProtocolV2`, if not will find a subclass for
> `FileCommitProtocol`, so the current version of `FileCommitProtocol` is still
> supported.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]