[
https://issues.apache.org/jira/browse/SPARK-44166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pralabh Kumar updated SPARK-44166:
----------------------------------
Description:
Currently in InsertIntoHiveTable.scala , there is no way to pass
dynamicPartitionOverwrite to true , when calling saveAsHiveFile . When
dynamicPartitioOverwrite is true , spark will use built-in FileCommitProtocol
instead of Hadoop FileOutputCommitter , which is more performant.
Here is the solution .
When inserting overwrite into Hive table
Current code
{code:java}
val writtenParts = saveAsHiveFile(
sparkSession = sparkSession,
plan = child,
hadoopConf = hadoopConf,
fileFormat = fileFormat,
outputLocation = tmpLocation.toString,
partitionAttributes = partitionColumns,
bucketSpec = bucketSpec,
options = options)
{code}
Proposed code.
enableDynamicPartitionOverwrite
{code:java}
val USE_FILECOMMITPROTOCOL_DYNAMIC_PARTITION_OVERWRITE =
buildConf("spark.sql.hive.filecommit.dynamicPartitionOverwrite"){code}
{code:java}
val enableDynamicPartitionOverwrite =
SQLConf.get.getConf(HiveUtils.USE_FILECOMMITPROTOCOL_DYNAMIC_PARTITION_OVERWRITE)
logWarning(s"enableDynamicPartitionOverwrite:
$enableDynamicPartitionOverwrite"){code}
Now if enableDynamicPartitionOverwrite is true and numDynamicPartitions > 0 and
overwrite is true , pass dynamicPartitionOverwrite true.
{code:java}
val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child,
hadoopConf = hadoopConf, fileFormat = fileFormat, outputLocation =
tmpLocation.toString, partitionAttributes = partitionColumns, bucketSpec =
bucketSpec, options = options, dynamicPartitionOverwrite =
enableDynamicPartitionOverwrite && numDynamicPartitions > 0 &&
overwrite) {code}
In saveAs File
{code:java}
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = outputLocation,
dynamicPartitionOverwrite = dynamicPartitionOverwrite) {code}
This will internal call with dynamicPartitionOverwrite value true.
{code:java}
class SQLHadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
{code}
was:
Currently in InsertIntoHiveTable.scala , there is no way to pass
dynamicPartitionOverwrite to true , when calling saveAsHiveFile . When
dynamicPartitioOverwrite is true , spark will use built-in FileCommitProtocol
instead of Hadoop FileOutputCommitter , which is more performant.
Here is the solution .
When inserting overwrite into Hive table
Current code
{code:java}
val writtenParts = saveAsHiveFile(
sparkSession = sparkSession,
plan = child,
hadoopConf = hadoopConf,
fileFormat = fileFormat,
outputLocation = tmpLocation.toString,
partitionAttributes = partitionColumns,
bucketSpec = bucketSpec,
options = options)
{code}
Proposed code.
enableDynamicPartitionOverwrite
{code:java}
val enableDynamicPartitionOverwrite =
SQLConf.get.getConf(HiveUtils.USE_FILECOMMITPROTOCOL_DYNAMIC_PARTITION_OVERWRITE)
logWarning(s"enableDynamicPartitionOverwrite:
$enableDynamicPartitionOverwrite"){code}
Now if enableDynamicPartitionOverwrite is true and numDynamicPartitions > 0 and
overwrite is true , pass dynamicPartitionOverwrite true.
{code:java}
val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child,
hadoopConf = hadoopConf, fileFormat = fileFormat, outputLocation =
tmpLocation.toString, partitionAttributes = partitionColumns, bucketSpec =
bucketSpec, options = options, dynamicPartitionOverwrite =
enableDynamicPartitionOverwrite && numDynamicPartitions > 0 &&
overwrite) {code}
In saveAs File
{code:java}
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = outputLocation,
dynamicPartitionOverwrite = dynamicPartitionOverwrite) {code}
This will internal call with dynamicPartitionOverwrite value true.
{code:java}
class SQLHadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
{code}
> Enable dynamicPartitionOverwrite in SaveAsHiveFile for insert overwrite
> -----------------------------------------------------------------------
>
> Key: SPARK-44166
> URL: https://issues.apache.org/jira/browse/SPARK-44166
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.4.1
> Reporter: Pralabh Kumar
> Priority: Minor
>
> Currently in InsertIntoHiveTable.scala , there is no way to pass
> dynamicPartitionOverwrite to true , when calling saveAsHiveFile . When
> dynamicPartitioOverwrite is true , spark will use built-in
> FileCommitProtocol instead of Hadoop FileOutputCommitter , which is more
> performant.
>
> Here is the solution .
> When inserting overwrite into Hive table
>
> Current code
>
> {code:java}
> val writtenParts = saveAsHiveFile(
> sparkSession = sparkSession,
> plan = child,
> hadoopConf = hadoopConf,
> fileFormat = fileFormat,
> outputLocation = tmpLocation.toString,
> partitionAttributes = partitionColumns,
> bucketSpec = bucketSpec,
> options = options)
> {code}
>
>
> Proposed code.
> enableDynamicPartitionOverwrite
> {code:java}
> val USE_FILECOMMITPROTOCOL_DYNAMIC_PARTITION_OVERWRITE =
> buildConf("spark.sql.hive.filecommit.dynamicPartitionOverwrite"){code}
>
> {code:java}
> val enableDynamicPartitionOverwrite =
>
> SQLConf.get.getConf(HiveUtils.USE_FILECOMMITPROTOCOL_DYNAMIC_PARTITION_OVERWRITE)
> logWarning(s"enableDynamicPartitionOverwrite:
> $enableDynamicPartitionOverwrite"){code}
>
>
> Now if enableDynamicPartitionOverwrite is true and numDynamicPartitions > 0
> and overwrite is true , pass dynamicPartitionOverwrite true.
>
> {code:java}
> val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child,
> hadoopConf = hadoopConf, fileFormat = fileFormat, outputLocation =
> tmpLocation.toString, partitionAttributes = partitionColumns, bucketSpec =
> bucketSpec, options = options, dynamicPartitionOverwrite =
> enableDynamicPartitionOverwrite && numDynamicPartitions > 0 &&
> overwrite) {code}
>
>
> In saveAs File
> {code:java}
> val committer = FileCommitProtocol.instantiate(
> sparkSession.sessionState.conf.fileCommitProtocolClass,
> jobId = java.util.UUID.randomUUID().toString,
> outputPath = outputLocation,
> dynamicPartitionOverwrite = dynamicPartitionOverwrite) {code}
> This will internal call with dynamicPartitionOverwrite value true.
>
> {code:java}
> class SQLHadoopMapReduceCommitProtocol(
> jobId: String,
> path: String,
> dynamicPartitionOverwrite: Boolean = false)
> extends HadoopMapReduceCommitProtocol(jobId, path,
> dynamicPartitionOverwrite) {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]