[ 
https://issues.apache.org/jira/browse/SPARK-44166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pralabh Kumar updated SPARK-44166:
----------------------------------
    Description: 
Currently in InsertIntoHiveTable.scala , there is no way to pass 
dynamicPartitionOverwrite to true , when calling  saveAsHiveFile . When 
dynamicPartitioOverwrite is true , spark will use  built-in FileCommitProtocol 
instead of Hadoop FileOutputCommitter , which is more performant. 

 

Here is the solution . 

When inserting overwrite into Hive table

 

Current code 

 
{code:java}
val writtenParts = saveAsHiveFile(
  sparkSession = sparkSession,
  plan = child,
  hadoopConf = hadoopConf,
  fileFormat = fileFormat,
  outputLocation = tmpLocation.toString,
  partitionAttributes = partitionColumns,
  bucketSpec = bucketSpec,
  options = options)
       {code}
 

 

Proposed code.  

enableDynamicPartitionOverwrite 

 
{code:java}
 val enableDynamicPartitionOverwrite =
      
SQLConf.get.getConf(HiveUtils.USE_FILECOMMITPROTOCOL_DYNAMIC_PARTITION_OVERWRITE)
    logWarning(s"enableDynamicPartitionOverwrite: 
$enableDynamicPartitionOverwrite"){code}
 

 

Now if enableDynamicPartitionOverwrite is true and numDynamicPartitions > 0 and 
overwrite is true , pass dynamicPartitionOverwrite true. 

 
{code:java}
val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child, 
hadoopConf = hadoopConf, fileFormat = fileFormat, outputLocation = 
tmpLocation.toString, partitionAttributes = partitionColumns, bucketSpec = 
bucketSpec, options = options, dynamicPartitionOverwrite =
        enableDynamicPartitionOverwrite && numDynamicPartitions > 0 && 
overwrite)       {code}
 

 

In saveAs File 
{code:java}
val committer = FileCommitProtocol.instantiate(
      sparkSession.sessionState.conf.fileCommitProtocolClass,
      jobId = java.util.UUID.randomUUID().toString,
      outputPath = outputLocation,
      dynamicPartitionOverwrite = dynamicPartitionOverwrite) {code}
This will internal call  with dynamicPartitionOverwrite value true. 

 
{code:java}
class SQLHadoopMapReduceCommitProtocol(
    jobId: String,
    path: String,
    dynamicPartitionOverwrite: Boolean = false)
  extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) 
{code}
 

 

 

  was:
Currently in InsertIntoHiveTable.scala , there is no way to pass 
dynamicPartitionOverwrite to true , when calling  saveAsHiveFile . When 
dynamicPartitioOverwrite is true , spark will use 
built-in FileCommitProtocol instead of Hadoop FileOutputCommitter , which is 
more performant. 

 

Here is the solution . 

When inserting overwrite into Hive table

 

Current code 

 
{code:java}
val writtenParts = saveAsHiveFile(
  sparkSession = sparkSession,
  plan = child,
  hadoopConf = hadoopConf,
  fileFormat = fileFormat,
  outputLocation = tmpLocation.toString,
  partitionAttributes = partitionColumns,
  bucketSpec = bucketSpec,
  options = options)
       {code}
 

 

Proposed code. 

 

 

 

 


> Enable dynamicPartitionOverwrite in SaveAsHiveFile for insert overwrite
> -----------------------------------------------------------------------
>
>                 Key: SPARK-44166
>                 URL: https://issues.apache.org/jira/browse/SPARK-44166
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.4.1
>            Reporter: Pralabh Kumar
>            Priority: Minor
>
> Currently in InsertIntoHiveTable.scala , there is no way to pass 
> dynamicPartitionOverwrite to true , when calling  saveAsHiveFile . When 
> dynamicPartitioOverwrite is true , spark will use  built-in 
> FileCommitProtocol instead of Hadoop FileOutputCommitter , which is more 
> performant. 
>  
> Here is the solution . 
> When inserting overwrite into Hive table
>  
> Current code 
>  
> {code:java}
> val writtenParts = saveAsHiveFile(
>   sparkSession = sparkSession,
>   plan = child,
>   hadoopConf = hadoopConf,
>   fileFormat = fileFormat,
>   outputLocation = tmpLocation.toString,
>   partitionAttributes = partitionColumns,
>   bucketSpec = bucketSpec,
>   options = options)
>        {code}
>  
>  
> Proposed code.  
> enableDynamicPartitionOverwrite 
>  
> {code:java}
>  val enableDynamicPartitionOverwrite =
>       
> SQLConf.get.getConf(HiveUtils.USE_FILECOMMITPROTOCOL_DYNAMIC_PARTITION_OVERWRITE)
>     logWarning(s"enableDynamicPartitionOverwrite: 
> $enableDynamicPartitionOverwrite"){code}
>  
>  
> Now if enableDynamicPartitionOverwrite is true and numDynamicPartitions > 0 
> and overwrite is true , pass dynamicPartitionOverwrite true. 
>  
> {code:java}
> val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child, 
> hadoopConf = hadoopConf, fileFormat = fileFormat, outputLocation = 
> tmpLocation.toString, partitionAttributes = partitionColumns, bucketSpec = 
> bucketSpec, options = options, dynamicPartitionOverwrite =
>         enableDynamicPartitionOverwrite && numDynamicPartitions > 0 && 
> overwrite)       {code}
>  
>  
> In saveAs File 
> {code:java}
> val committer = FileCommitProtocol.instantiate(
>       sparkSession.sessionState.conf.fileCommitProtocolClass,
>       jobId = java.util.UUID.randomUUID().toString,
>       outputPath = outputLocation,
>       dynamicPartitionOverwrite = dynamicPartitionOverwrite) {code}
> This will internal call  with dynamicPartitionOverwrite value true. 
>  
> {code:java}
> class SQLHadoopMapReduceCommitProtocol(
>     jobId: String,
>     path: String,
>     dynamicPartitionOverwrite: Boolean = false)
>   extends HadoopMapReduceCommitProtocol(jobId, path, 
> dynamicPartitionOverwrite) {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to