[ https://issues.apache.org/jira/browse/SPARK-44166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pralabh Kumar updated SPARK-44166: ---------------------------------- Description: Currently in InsertIntoHiveTable.scala , there is no way to pass dynamicPartitionOverwrite to true , when calling saveAsHiveFile . When dynamicPartitioOverwrite is true , spark will use built-in FileCommitProtocol instead of Hadoop FileOutputCommitter , which is more performant. Here is the solution . When inserting overwrite into Hive table Current code {code:java} val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child, hadoopConf = hadoopConf, fileFormat = fileFormat, outputLocation = tmpLocation.toString, partitionAttributes = partitionColumns, bucketSpec = bucketSpec, options = options) {code} Proposed code. enableDynamicPartitionOverwrite {code:java} val enableDynamicPartitionOverwrite = SQLConf.get.getConf(HiveUtils.USE_FILECOMMITPROTOCOL_DYNAMIC_PARTITION_OVERWRITE) logWarning(s"enableDynamicPartitionOverwrite: $enableDynamicPartitionOverwrite"){code} Now if enableDynamicPartitionOverwrite is true and numDynamicPartitions > 0 and overwrite is true , pass dynamicPartitionOverwrite true. {code:java} val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child, hadoopConf = hadoopConf, fileFormat = fileFormat, outputLocation = tmpLocation.toString, partitionAttributes = partitionColumns, bucketSpec = bucketSpec, options = options, dynamicPartitionOverwrite = enableDynamicPartitionOverwrite && numDynamicPartitions > 0 && overwrite) {code} In saveAs File {code:java} val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, outputPath = outputLocation, dynamicPartitionOverwrite = dynamicPartitionOverwrite) {code} This will internal call with dynamicPartitionOverwrite value true. {code:java} class SQLHadoopMapReduceCommitProtocol( jobId: String, path: String, dynamicPartitionOverwrite: Boolean = false) extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) {code} was: Currently in InsertIntoHiveTable.scala , there is no way to pass dynamicPartitionOverwrite to true , when calling saveAsHiveFile . When dynamicPartitioOverwrite is true , spark will use built-in FileCommitProtocol instead of Hadoop FileOutputCommitter , which is more performant. Here is the solution . When inserting overwrite into Hive table Current code {code:java} val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child, hadoopConf = hadoopConf, fileFormat = fileFormat, outputLocation = tmpLocation.toString, partitionAttributes = partitionColumns, bucketSpec = bucketSpec, options = options) {code} Proposed code. > Enable dynamicPartitionOverwrite in SaveAsHiveFile for insert overwrite > ----------------------------------------------------------------------- > > Key: SPARK-44166 > URL: https://issues.apache.org/jira/browse/SPARK-44166 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.4.1 > Reporter: Pralabh Kumar > Priority: Minor > > Currently in InsertIntoHiveTable.scala , there is no way to pass > dynamicPartitionOverwrite to true , when calling saveAsHiveFile . When > dynamicPartitioOverwrite is true , spark will use built-in > FileCommitProtocol instead of Hadoop FileOutputCommitter , which is more > performant. > > Here is the solution . > When inserting overwrite into Hive table > > Current code > > {code:java} > val writtenParts = saveAsHiveFile( > sparkSession = sparkSession, > plan = child, > hadoopConf = hadoopConf, > fileFormat = fileFormat, > outputLocation = tmpLocation.toString, > partitionAttributes = partitionColumns, > bucketSpec = bucketSpec, > options = options) > {code} > > > Proposed code. > enableDynamicPartitionOverwrite > > {code:java} > val enableDynamicPartitionOverwrite = > > SQLConf.get.getConf(HiveUtils.USE_FILECOMMITPROTOCOL_DYNAMIC_PARTITION_OVERWRITE) > logWarning(s"enableDynamicPartitionOverwrite: > $enableDynamicPartitionOverwrite"){code} > > > Now if enableDynamicPartitionOverwrite is true and numDynamicPartitions > 0 > and overwrite is true , pass dynamicPartitionOverwrite true. > > {code:java} > val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child, > hadoopConf = hadoopConf, fileFormat = fileFormat, outputLocation = > tmpLocation.toString, partitionAttributes = partitionColumns, bucketSpec = > bucketSpec, options = options, dynamicPartitionOverwrite = > enableDynamicPartitionOverwrite && numDynamicPartitions > 0 && > overwrite) {code} > > > In saveAs File > {code:java} > val committer = FileCommitProtocol.instantiate( > sparkSession.sessionState.conf.fileCommitProtocolClass, > jobId = java.util.UUID.randomUUID().toString, > outputPath = outputLocation, > dynamicPartitionOverwrite = dynamicPartitionOverwrite) {code} > This will internal call with dynamicPartitionOverwrite value true. > > {code:java} > class SQLHadoopMapReduceCommitProtocol( > jobId: String, > path: String, > dynamicPartitionOverwrite: Boolean = false) > extends HadoopMapReduceCommitProtocol(jobId, path, > dynamicPartitionOverwrite) {code} > > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org