This is an automated email from the ASF dual-hosted git repository.
jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f633870b41 [GLUTEN-8039][VL] Native writer should respect table
properties (#8040)
f633870b41 is described below
commit f633870b41613bd04d1a63c587b7c1de4ed234d4
Author: Kaifei Yi <[email protected]>
AuthorDate: Wed Nov 27 14:16:05 2024 +0800
[GLUTEN-8039][VL] Native writer should respect table properties (#8040)
---
.../execution/VeloxParquetWriteForHiveSuite.scala | 37 ++++++++++++++++++++++
.../spark/sql/hive/execution/HiveFileFormat.scala | 10 ++++--
.../spark/sql/hive/execution/HiveFileFormat.scala | 9 ++++--
3 files changed, 50 insertions(+), 6 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 8c2b98988e..5932f4e5a7 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -27,6 +27,11 @@ import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.util.QueryExecutionListener
+import org.apache.spark.util.Utils
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.util.HadoopInputFile
class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
private var _spark: SparkSession = _
@@ -185,4 +190,36 @@ class VeloxParquetWriteForHiveSuite extends
GlutenQueryTest with SQLTestUtils {
checkAnswer(spark.table("t"), Row(1))
}
}
+
+ test("native writer should respect table properties") {
+ Seq(true, false).foreach {
+ enableNativeWrite =>
+ withSQLConf("spark.gluten.sql.native.writer.enabled" ->
enableNativeWrite.toString) {
+ withTable("t") {
+ withSQLConf(
+ "spark.sql.hive.convertMetastoreParquet" -> "false",
+ "spark.sql.parquet.compression.codec" -> "gzip") {
+ checkNativeWrite(
+ "CREATE TABLE t STORED AS PARQUET TBLPROPERTIES
('parquet.compression'='zstd') " +
+ "AS SELECT 1 as c",
+ checkNative = enableNativeWrite)
+ val tableDir = new
Path(s"${conf.getConf(StaticSQLConf.WAREHOUSE_PATH)}/t")
+ val configuration = spark.sessionState.newHadoopConf()
+ val files = tableDir
+ .getFileSystem(configuration)
+ .listStatus(tableDir)
+ .filterNot(_.getPath.getName.startsWith("\\."))
+ assert(files.nonEmpty)
+ val in = HadoopInputFile.fromStatus(files.head,
spark.sessionState.newHadoopConf())
+ Utils.tryWithResource(ParquetFileReader.open(in)) {
+ reader =>
+ val column =
reader.getFooter.getBlocks.get(0).getColumns.get(0)
+ // native writer and vanilla spark hive writer should be
consistent
+ "zstd".equalsIgnoreCase(column.getCodec.toString)
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index c21c67f654..821d8317d8 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapred.{JobConf, Reporter}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import scala.collection.JavaConverters._
+
/* -
* This class is copied from Spark 3.2 and modified for Gluten. \n
* Gluten should make sure this class is loaded before the original class.
@@ -101,19 +102,22 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
val nativeFormat =
sparkSession.sparkContext.getLocalProperty("nativeFormat")
+ val tableOptions = tableDesc.getProperties.asScala.toMap
val isParquetFormat = nativeFormat == "parquet"
val compressionCodec = if (fileSinkConf.compressed) {
// hive related configurations
fileSinkConf.compressCodec
} else if (isParquetFormat) {
- val parquetOptions = new ParquetOptions(options,
sparkSession.sessionState.conf)
+ val parquetOptions =
+ new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
parquetOptions.compressionCodecClassName
} else {
- val orcOptions = new OrcOptions(options,
sparkSession.sessionState.conf)
+ val orcOptions = new OrcOptions(tableOptions,
sparkSession.sessionState.conf)
orcOptions.compressionCodec
}
- val nativeConf = GlutenFormatFactory(nativeFormat).nativeConf(options,
compressionCodec)
+ val nativeConf =
+ GlutenFormatFactory(nativeFormat).nativeConf(tableOptions,
compressionCodec)
new OutputWriterFactory {
private val jobConf = new SerializableJobConf(new JobConf(conf))
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 6ed1b4d215..61ad8fe72b 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -98,19 +98,22 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
val nativeFormat =
sparkSession.sparkContext.getLocalProperty("nativeFormat")
+ val tableOptions = tableDesc.getProperties.asScala.toMap
val isParquetFormat = nativeFormat == "parquet"
val compressionCodec = if (fileSinkConf.compressed) {
// hive related configurations
fileSinkConf.compressCodec
} else if (isParquetFormat) {
- val parquetOptions = new ParquetOptions(options,
sparkSession.sessionState.conf)
+ val parquetOptions =
+ new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
parquetOptions.compressionCodecClassName
} else {
- val orcOptions = new OrcOptions(options,
sparkSession.sessionState.conf)
+ val orcOptions = new OrcOptions(tableOptions,
sparkSession.sessionState.conf)
orcOptions.compressionCodec
}
- val nativeConf = GlutenFormatFactory(nativeFormat).nativeConf(options,
compressionCodec)
+ val nativeConf =
+ GlutenFormatFactory(nativeFormat).nativeConf(tableOptions,
compressionCodec)
new OutputWriterFactory {
private val jobConf = new SerializableJobConf(new JobConf(conf))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]