This is an automated email from the ASF dual-hosted git repository. qiangcai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new e6850d5 [CARBONDATA-3940] Remove "spark.sql.sources.commitProtocolClass" configuration from SparkCarbonTableFormat e6850d5 is described below commit e6850d59773c237571405950a3f8aaa501d3e716 Author: haomarch <marchp...@126.com> AuthorDate: Thu Aug 6 15:01:18 2020 +0800 [CARBONDATA-3940] Remove "spark.sql.sources.commitProtocolClass" configuration from SparkCarbonTableFormat Why is this PR needed? During the load process, commitTask fails with high probability. The exception stack shows that it was throwed by HadoopMapReduceCommitProtocol, not CarbonSQLHadoopMapMapReduceCommitProtocol, implying that there is class init error during the initializing of "Committer". It should have been initialized as CarbonSQLHadoopMapMapReduceCommitProtocol, but was incorrectly initialized to HadoopMapReduceCommitProtocol. What changes were proposed in this PR? Init the committer to be CarbonSQLHadoopMapMapReduceCommitProtocol directly Does this PR introduce any user interface change? No Is any new testcase added? No This closes #3883 --- .../management/CarbonInsertIntoHadoopFsRelationCommand.scala | 9 +++------ .../spark/sql/execution/datasources/SparkCarbonTableFormat.scala | 4 ---- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala index b8253b9..eb19440 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoHadoopFsRelationCommand.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter, FileIndex, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.{CarbonSQLHadoopMapReduceCommitProtocol, FileFormat, FileFormatWriter, FileIndex, PartitioningUtils, SparkCarbonTableFormat} import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.util.SchemaUtils @@ -104,11 +104,8 @@ case class CarbonInsertIntoHadoopFsRelationCommand( val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite && staticPartitions.size < partitionColumns.length - val committer = FileCommitProtocol.instantiate( - sparkSession.sessionState.conf.fileCommitProtocolClass, - jobId = java.util.UUID.randomUUID().toString, - outputPath = outputPath.toString, - dynamicPartitionOverwrite = dynamicPartitionOverwrite) + val committer = CarbonSQLHadoopMapReduceCommitProtocol(java.util.UUID.randomUUID().toString, + outputPath.toString, dynamicPartitionOverwrite) val doInsertion = (mode, pathExists) match { case (SaveMode.ErrorIfExists, true) => diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala index ecb8547..726523f 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -70,10 +70,6 @@ with Serializable { None } - SparkSession.getActiveSession.get.sessionState.conf.setConfString( - "spark.sql.sources.commitProtocolClass", - "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") - override def prepareWrite( sparkSession: SparkSession, job: Job,