This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new decd8b4cf [GLUTEN-6612] Fix ParquetFileFormat issue caused by the 
setting of local property isNativeApplicable (#6627)
decd8b4cf is described below

commit decd8b4cf3eb0612b50984c9c81ddf19c77d7d81
Author: PHILO-HE <[email protected]>
AuthorDate: Wed Jul 31 10:48:30 2024 +0800

    [GLUTEN-6612] Fix ParquetFileFormat issue caused by the setting of local 
property isNativeApplicable (#6627)
---
 .../sql/delta/ClickhouseOptimisticTransaction.scala      |  2 +-
 .../sql/delta/ClickhouseOptimisticTransaction.scala      |  2 +-
 .../sql/delta/ClickhouseOptimisticTransaction.scala      |  2 +-
 .../datasources/GlutenWriterColumnarRules.scala          | 12 +++++-------
 .../sql/execution/datasources/FileFormatWriter.scala     |  6 +++---
 .../sql/execution/datasources/orc/OrcFileFormat.scala    |  4 ++--
 .../datasources/parquet/ParquetFileFormat.scala          | 16 ++++++----------
 .../apache/spark/sql/hive/execution/HiveFileFormat.scala |  4 ++--
 .../sql/execution/datasources/FileFormatWriter.scala     |  6 +++---
 .../sql/execution/datasources/orc/OrcFileFormat.scala    |  4 ++--
 .../datasources/parquet/ParquetFileFormat.scala          | 14 +++++---------
 .../apache/spark/sql/hive/execution/HiveFileFormat.scala |  4 ++--
 12 files changed, 33 insertions(+), 43 deletions(-)

diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 4133b5c60..3314465c5 100644
--- 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -177,7 +177,7 @@ class ClickhouseOptimisticTransaction(
       // 1. insert FakeRowAdaptor
       // 2. DeltaInvariantCheckerExec transform
       // 3. DeltaTaskStatisticsTracker collect null count / min values / max 
values
-      // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
+      // 4. set the parameters 'staticPartitionWriteOnly', 
'isNativeApplicable',
       //    'nativeFormat' in the LocalProperty of the sparkcontext
       super.writeFiles(inputData, writeOptions, additionalConstraints)
     }
diff --git 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 4133b5c60..3314465c5 100644
--- 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -177,7 +177,7 @@ class ClickhouseOptimisticTransaction(
       // 1. insert FakeRowAdaptor
       // 2. DeltaInvariantCheckerExec transform
       // 3. DeltaTaskStatisticsTracker collect null count / min values / max 
values
-      // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
+      // 4. set the parameters 'staticPartitionWriteOnly', 
'isNativeApplicable',
       //    'nativeFormat' in the LocalProperty of the sparkcontext
       super.writeFiles(inputData, writeOptions, additionalConstraints)
     }
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 9e79c4f2e..6eec68efe 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -185,7 +185,7 @@ class ClickhouseOptimisticTransaction(
       // 1. insert FakeRowAdaptor
       // 2. DeltaInvariantCheckerExec transform
       // 3. DeltaTaskStatisticsTracker collect null count / min values / max 
values
-      // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
+      // 4. set the parameters 'staticPartitionWriteOnly', 
'isNativeApplicable',
       //    'nativeFormat' in the LocalProperty of the sparkcontext
       super.writeFiles(inputData, writeOptions, additionalConstraints)
     }
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 7063c3f67..20b006015 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -163,6 +163,10 @@ object GlutenWriterColumnarRules {
             BackendsApiManager.getSettings.enableNativeWriteFiles() =>
         injectFakeRowAdaptor(rc, rc.child)
       case rc @ DataWritingCommandExec(cmd, child) =>
+        // These properties can be set by the same thread in last query 
submission.
+        session.sparkContext.setLocalProperty("isNativeApplicable", null)
+        session.sparkContext.setLocalProperty("nativeFormat", null)
+        session.sparkContext.setLocalProperty("staticPartitionWriteOnly", null)
         if 
(BackendsApiManager.getSettings.supportNativeWrite(child.output.toStructType.fields))
 {
           val format = getNativeFormat(cmd)
           session.sparkContext.setLocalProperty(
@@ -170,7 +174,7 @@ object GlutenWriterColumnarRules {
             BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
           // FIXME: We should only use context property if having no other 
approaches.
           //  Should see if there is another way to pass these options.
-          session.sparkContext.setLocalProperty("isNativeAppliable", 
format.isDefined.toString)
+          session.sparkContext.setLocalProperty("isNativeApplicable", 
format.isDefined.toString)
           session.sparkContext.setLocalProperty("nativeFormat", 
format.getOrElse(""))
           if (format.isDefined) {
             injectFakeRowAdaptor(rc, child)
@@ -178,12 +182,6 @@ object GlutenWriterColumnarRules {
             rc.withNewChildren(rc.children.map(apply))
           }
         } else {
-          session.sparkContext.setLocalProperty(
-            "staticPartitionWriteOnly",
-            BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
-          session.sparkContext.setLocalProperty("isNativeAppliable", "false")
-          session.sparkContext.setLocalProperty("nativeFormat", "")
-
           rc.withNewChildren(rc.children.map(apply))
         }
       case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply))
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index a5c857103..96a044c0c 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -148,9 +148,9 @@ object FileFormatWriter extends Logging {
       numStaticPartitionCols: Int = 0): Set[String] = {
 
     val nativeEnabled =
-      
"true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
+      "true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
     val staticPartitionWriteOnly =
-      
"true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly"))
+      "true" == 
sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")
 
     if (nativeEnabled) {
       logInfo("Use Gluten partition write for hive")
@@ -257,7 +257,7 @@ object FileFormatWriter extends Logging {
       }
 
       val nativeFormat = 
sparkSession.sparkContext.getLocalProperty("nativeFormat")
-      if ("parquet".equals(nativeFormat)) {
+      if ("parquet" == nativeFormat) {
         
(GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped),
 None)
       } else {
         
(GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), 
None)
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 34873c46b..619fa64ac 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -83,7 +83,7 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
     // Why if (false)? Such code requires comments when being written.
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) 
&& false) {
+    if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
       GlutenOrcWriterInjects
         .getInstance()
         .inferSchema(sparkSession, Map.empty[String, String], files)
@@ -109,7 +109,7 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
       .asInstanceOf[JobConf]
       
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
 
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
 {
+    if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
       // pass compression to job conf so that the file extension can be aware 
of it.
       val nativeConf =
         GlutenOrcWriterInjects
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index c6b383136..42a63c7eb 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -83,7 +83,7 @@ class ParquetFileFormat extends FileFormat with 
DataSourceRegister with Logging
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
 {
+    if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
 
       // pass compression to job conf so that the file extension can be aware 
of it.
       val conf = ContextUtil.getConfiguration(job)
@@ -201,7 +201,7 @@ class ParquetFileFormat extends FileFormat with 
DataSourceRegister with Logging
       parameters: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
     // Why if (false)? Such code requires comments when being written.
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) 
&& false) {
+    if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
       GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, 
parameters, files)
     } else { // the vanilla spark case
       ParquetUtils.inferSchema(sparkSession, parameters, files)
@@ -210,14 +210,10 @@ class ParquetFileFormat extends FileFormat with 
DataSourceRegister with Logging
 
   /** Returns whether the reader will return the rows as batch or not. */
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
 {
-      true
-    } else {
-      val conf = sparkSession.sessionState.conf
-      conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
-      schema.length <= conf.wholeStageMaxNumFields &&
-      schema.forall(_.dataType.isInstanceOf[AtomicType])
-    }
+    val conf = sparkSession.sessionState.conf
+    conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
+    schema.length <= conf.wholeStageMaxNumFields &&
+    schema.forall(_.dataType.isInstanceOf[AtomicType])
   }
 
   override def vectorTypes(
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 162dd342b..eb0f6a5d9 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -100,9 +100,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
     // Avoid referencing the outer object.
     val fileSinkConfSer = fileSinkConf
     val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
 {
+    if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
       val nativeFormat = 
sparkSession.sparkContext.getLocalProperty("nativeFormat")
-      val isParquetFormat = nativeFormat.equals("parquet")
+      val isParquetFormat = nativeFormat == "parquet"
       val compressionCodec = if (fileSinkConf.compressed) {
         // hive related configurations
         fileSinkConf.compressCodec
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index ebf45e76e..f5e932337 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -140,9 +140,9 @@ object FileFormatWriter extends Logging {
       numStaticPartitionCols: Int = 0): Set[String] = {
 
     val nativeEnabled =
-      
"true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
+      "true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
     val staticPartitionWriteOnly =
-      
"true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly"))
+      "true" == 
sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")
 
     if (nativeEnabled) {
       logInfo("Use Gluten partition write for hive")
@@ -277,7 +277,7 @@ object FileFormatWriter extends Logging {
       }
 
       val nativeFormat = 
sparkSession.sparkContext.getLocalProperty("nativeFormat")
-      if ("parquet".equals(nativeFormat)) {
+      if ("parquet" == nativeFormat) {
         
(GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped),
 None)
       } else {
         
(GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), 
None)
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 49ac28d73..9891f6851 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
     // Why if (false)? Such code requires comments when being written.
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) 
&& false) {
+    if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
       GlutenOrcWriterInjects.getInstance().inferSchema(sparkSession, options, 
files)
     } else { // the vanilla spark case
       OrcUtils.inferSchema(sparkSession, files, options)
@@ -88,7 +88,7 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
       .asInstanceOf[JobConf]
       
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
 
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
 {
+    if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
       // pass compression to job conf so that the file extension can be aware 
of it.
       val nativeConf =
         GlutenOrcWriterInjects
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index b0573f68e..403e31c1c 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -75,7 +75,7 @@ class ParquetFileFormat extends FileFormat with 
DataSourceRegister with Logging
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
 {
+    if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
 
       // pass compression to job conf so that the file extension can be aware 
of it.
       val conf = ContextUtil.getConfiguration(job)
@@ -197,7 +197,7 @@ class ParquetFileFormat extends FileFormat with 
DataSourceRegister with Logging
       parameters: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
     // Why if (false)? Such code requires comments when being written.
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")) 
&& false) {
+    if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
       GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession, 
parameters, files)
     } else { // the vanilla spark case
       ParquetUtils.inferSchema(sparkSession, parameters, files)
@@ -206,13 +206,9 @@ class ParquetFileFormat extends FileFormat with 
DataSourceRegister with Logging
 
   /** Returns whether the reader will return the rows as batch or not. */
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
 {
-      true
-    } else {
-      val conf = sparkSession.sessionState.conf
-      ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && 
conf.wholeStageEnabled &&
-      !WholeStageCodegenExec.isTooManyFields(conf, schema)
-    }
+    val conf = sparkSession.sessionState.conf
+    ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && 
conf.wholeStageEnabled &&
+    !WholeStageCodegenExec.isTooManyFields(conf, schema)
   }
 
   override def vectorTypes(
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 7a824c436..b9c1622cb 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -97,9 +97,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
     // Avoid referencing the outer object.
     val fileSinkConfSer = fileSinkConf
     val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
-    if 
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
 {
+    if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
       val nativeFormat = 
sparkSession.sparkContext.getLocalProperty("nativeFormat")
-      val isParquetFormat = nativeFormat.equals("parquet")
+      val isParquetFormat = nativeFormat == "parquet"
       val compressionCodec = if (fileSinkConf.compressed) {
         // hive related configurations
         fileSinkConf.compressCodec


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to