This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new decd8b4cf [GLUTEN-6612] Fix ParquetFileFormat issue caused by the
setting of local property isNativeApplicable (#6627)
decd8b4cf is described below
commit decd8b4cf3eb0612b50984c9c81ddf19c77d7d81
Author: PHILO-HE <[email protected]>
AuthorDate: Wed Jul 31 10:48:30 2024 +0800
[GLUTEN-6612] Fix ParquetFileFormat issue caused by the setting of local
property isNativeApplicable (#6627)
---
.../sql/delta/ClickhouseOptimisticTransaction.scala | 2 +-
.../sql/delta/ClickhouseOptimisticTransaction.scala | 2 +-
.../sql/delta/ClickhouseOptimisticTransaction.scala | 2 +-
.../datasources/GlutenWriterColumnarRules.scala | 12 +++++-------
.../sql/execution/datasources/FileFormatWriter.scala | 6 +++---
.../sql/execution/datasources/orc/OrcFileFormat.scala | 4 ++--
.../datasources/parquet/ParquetFileFormat.scala | 16 ++++++----------
.../apache/spark/sql/hive/execution/HiveFileFormat.scala | 4 ++--
.../sql/execution/datasources/FileFormatWriter.scala | 6 +++---
.../sql/execution/datasources/orc/OrcFileFormat.scala | 4 ++--
.../datasources/parquet/ParquetFileFormat.scala | 14 +++++---------
.../apache/spark/sql/hive/execution/HiveFileFormat.scala | 4 ++--
12 files changed, 33 insertions(+), 43 deletions(-)
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 4133b5c60..3314465c5 100644
---
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -177,7 +177,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max
values
- // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
+ // 4. set the parameters 'staticPartitionWriteOnly',
'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
diff --git
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 4133b5c60..3314465c5 100644
---
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -177,7 +177,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max
values
- // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
+ // 4. set the parameters 'staticPartitionWriteOnly',
'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 9e79c4f2e..6eec68efe 100644
---
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -185,7 +185,7 @@ class ClickhouseOptimisticTransaction(
// 1. insert FakeRowAdaptor
// 2. DeltaInvariantCheckerExec transform
// 3. DeltaTaskStatisticsTracker collect null count / min values / max
values
- // 4. set the parameters 'staticPartitionWriteOnly', 'isNativeAppliable',
+ // 4. set the parameters 'staticPartitionWriteOnly',
'isNativeApplicable',
// 'nativeFormat' in the LocalProperty of the sparkcontext
super.writeFiles(inputData, writeOptions, additionalConstraints)
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 7063c3f67..20b006015 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -163,6 +163,10 @@ object GlutenWriterColumnarRules {
BackendsApiManager.getSettings.enableNativeWriteFiles() =>
injectFakeRowAdaptor(rc, rc.child)
case rc @ DataWritingCommandExec(cmd, child) =>
+ // These properties can be set by the same thread in last query
submission.
+ session.sparkContext.setLocalProperty("isNativeApplicable", null)
+ session.sparkContext.setLocalProperty("nativeFormat", null)
+ session.sparkContext.setLocalProperty("staticPartitionWriteOnly", null)
if
(BackendsApiManager.getSettings.supportNativeWrite(child.output.toStructType.fields))
{
val format = getNativeFormat(cmd)
session.sparkContext.setLocalProperty(
@@ -170,7 +174,7 @@ object GlutenWriterColumnarRules {
BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
// FIXME: We should only use context property if having no other
approaches.
// Should see if there is another way to pass these options.
- session.sparkContext.setLocalProperty("isNativeAppliable",
format.isDefined.toString)
+ session.sparkContext.setLocalProperty("isNativeApplicable",
format.isDefined.toString)
session.sparkContext.setLocalProperty("nativeFormat",
format.getOrElse(""))
if (format.isDefined) {
injectFakeRowAdaptor(rc, child)
@@ -178,12 +182,6 @@ object GlutenWriterColumnarRules {
rc.withNewChildren(rc.children.map(apply))
}
} else {
- session.sparkContext.setLocalProperty(
- "staticPartitionWriteOnly",
- BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
- session.sparkContext.setLocalProperty("isNativeAppliable", "false")
- session.sparkContext.setLocalProperty("nativeFormat", "")
-
rc.withNewChildren(rc.children.map(apply))
}
case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply))
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index a5c857103..96a044c0c 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -148,9 +148,9 @@ object FileFormatWriter extends Logging {
numStaticPartitionCols: Int = 0): Set[String] = {
val nativeEnabled =
-
"true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
+ "true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
val staticPartitionWriteOnly =
-
"true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly"))
+ "true" ==
sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")
if (nativeEnabled) {
logInfo("Use Gluten partition write for hive")
@@ -257,7 +257,7 @@ object FileFormatWriter extends Logging {
}
val nativeFormat =
sparkSession.sparkContext.getLocalProperty("nativeFormat")
- if ("parquet".equals(nativeFormat)) {
+ if ("parquet" == nativeFormat) {
(GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped),
None)
} else {
(GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped),
None)
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 34873c46b..619fa64ac 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -83,7 +83,7 @@ class OrcFileFormat extends FileFormat with
DataSourceRegister with Serializable
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
&& false) {
+ if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
GlutenOrcWriterInjects
.getInstance()
.inferSchema(sparkSession, Map.empty[String, String], files)
@@ -109,7 +109,7 @@ class OrcFileFormat extends FileFormat with
DataSourceRegister with Serializable
.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
{
+ if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware
of it.
val nativeConf =
GlutenOrcWriterInjects
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index c6b383136..42a63c7eb 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -83,7 +83,7 @@ class ParquetFileFormat extends FileFormat with
DataSourceRegister with Logging
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
{
+ if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware
of it.
val conf = ContextUtil.getConfiguration(job)
@@ -201,7 +201,7 @@ class ParquetFileFormat extends FileFormat with
DataSourceRegister with Logging
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
&& false) {
+ if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession,
parameters, files)
} else { // the vanilla spark case
ParquetUtils.inferSchema(sparkSession, parameters, files)
@@ -210,14 +210,10 @@ class ParquetFileFormat extends FileFormat with
DataSourceRegister with Logging
/** Returns whether the reader will return the rows as batch or not. */
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
{
- true
- } else {
- val conf = sparkSession.sessionState.conf
- conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
- schema.length <= conf.wholeStageMaxNumFields &&
- schema.forall(_.dataType.isInstanceOf[AtomicType])
- }
+ val conf = sparkSession.sessionState.conf
+ conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
+ schema.length <= conf.wholeStageMaxNumFields &&
+ schema.forall(_.dataType.isInstanceOf[AtomicType])
}
override def vectorTypes(
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 162dd342b..eb0f6a5d9 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -100,9 +100,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
// Avoid referencing the outer object.
val fileSinkConfSer = fileSinkConf
val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
{
+ if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
val nativeFormat =
sparkSession.sparkContext.getLocalProperty("nativeFormat")
- val isParquetFormat = nativeFormat.equals("parquet")
+ val isParquetFormat = nativeFormat == "parquet"
val compressionCodec = if (fileSinkConf.compressed) {
// hive related configurations
fileSinkConf.compressCodec
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index ebf45e76e..f5e932337 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -140,9 +140,9 @@ object FileFormatWriter extends Logging {
numStaticPartitionCols: Int = 0): Set[String] = {
val nativeEnabled =
-
"true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
+ "true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
val staticPartitionWriteOnly =
-
"true".equals(sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly"))
+ "true" ==
sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")
if (nativeEnabled) {
logInfo("Use Gluten partition write for hive")
@@ -277,7 +277,7 @@ object FileFormatWriter extends Logging {
}
val nativeFormat =
sparkSession.sparkContext.getLocalProperty("nativeFormat")
- if ("parquet".equals(nativeFormat)) {
+ if ("parquet" == nativeFormat) {
(GlutenParquetWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped),
None)
} else {
(GlutenOrcWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped),
None)
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 49ac28d73..9891f6851 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -66,7 +66,7 @@ class OrcFileFormat extends FileFormat with
DataSourceRegister with Serializable
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
&& false) {
+ if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
GlutenOrcWriterInjects.getInstance().inferSchema(sparkSession, options,
files)
} else { // the vanilla spark case
OrcUtils.inferSchema(sparkSession, files, options)
@@ -88,7 +88,7 @@ class OrcFileFormat extends FileFormat with
DataSourceRegister with Serializable
.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
{
+ if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware
of it.
val nativeConf =
GlutenOrcWriterInjects
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index b0573f68e..403e31c1c 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -75,7 +75,7 @@ class ParquetFileFormat extends FileFormat with
DataSourceRegister with Logging
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
{
+ if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware
of it.
val conf = ContextUtil.getConfiguration(job)
@@ -197,7 +197,7 @@ class ParquetFileFormat extends FileFormat with
DataSourceRegister with Logging
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// Why if (false)? Such code requires comments when being written.
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable"))
&& false) {
+ if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable") && false) {
GlutenParquetWriterInjects.getInstance().inferSchema(sparkSession,
parameters, files)
} else { // the vanilla spark case
ParquetUtils.inferSchema(sparkSession, parameters, files)
@@ -206,13 +206,9 @@ class ParquetFileFormat extends FileFormat with
DataSourceRegister with Logging
/** Returns whether the reader will return the rows as batch or not. */
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
{
- true
- } else {
- val conf = sparkSession.sessionState.conf
- ParquetUtils.isBatchReadSupportedForSchema(conf, schema) &&
conf.wholeStageEnabled &&
- !WholeStageCodegenExec.isTooManyFields(conf, schema)
- }
+ val conf = sparkSession.sessionState.conf
+ ParquetUtils.isBatchReadSupportedForSchema(conf, schema) &&
conf.wholeStageEnabled &&
+ !WholeStageCodegenExec.isTooManyFields(conf, schema)
}
override def vectorTypes(
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 7a824c436..b9c1622cb 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -97,9 +97,9 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
// Avoid referencing the outer object.
val fileSinkConfSer = fileSinkConf
val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
- if
("true".equals(sparkSession.sparkContext.getLocalProperty("isNativeAppliable")))
{
+ if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
val nativeFormat =
sparkSession.sparkContext.getLocalProperty("nativeFormat")
- val isParquetFormat = nativeFormat.equals("parquet")
+ val isParquetFormat = nativeFormat == "parquet"
val compressionCodec = if (fileSinkConf.compressed) {
// hive related configurations
fileSinkConf.compressCodec
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]