This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7dda6866d0 [GLUTEN-8385][VL] Support write compatible-hive bucket
table for Spark3.4 and Spark3.5.(#8386)
7dda6866d0 is described below
commit 7dda6866d005deff4c0dd070727f890b2815a565
Author: Kaifei Yi <[email protected]>
AuthorDate: Wed Jan 8 09:27:21 2025 +0800
[GLUTEN-8385][VL] Support write compatible-hive bucket table for Spark3.4
and Spark3.5.(#8386)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 1 +
.../gluten/backendsapi/velox/VeloxBackend.scala | 14 ++-
.../spark/sql/execution/BucketWriteUtils.scala | 95 ++++++++++++++++++
.../execution/VeloxParquetWriteForHiveSuite.scala | 110 ++++++++++++++++++++-
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 36 ++++++-
docs/developers/SubstraitModifications.md | 1 +
.../apache/gluten/substrait/rel/RelBuilder.java | 8 +-
.../apache/gluten/substrait/rel/WriteRelNode.java | 15 ++-
.../substrait/proto/substrait/algebra.proto | 10 ++
.../gluten/backendsapi/BackendSettingsApi.scala | 1 +
.../execution/WriteFilesExecTransformer.scala | 15 ++-
11 files changed, 288 insertions(+), 18 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 924f3b3170..aa9e3e553c 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -227,6 +227,7 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
+ isPartitionedTable: Boolean,
options: Map[String, String]): ValidationResult = {
def validateCompressionCodec(): Option[String] = {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 4918a6eade..6c51ad484c 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -232,6 +232,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
+ isPartitionedTable: Boolean,
options: Map[String, String]): ValidationResult = {
// Validate if HiveFileFormat write is supported based on output file type
@@ -331,10 +332,17 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
def validateBucketSpec(): Option[String] = {
- if (bucketSpec.nonEmpty) {
- Some("Unsupported native write: bucket write is not supported.")
- } else {
+ val isHiveCompatibleBucketTable = bucketSpec.nonEmpty && options
+ .getOrElse("__hive_compatible_bucketed_table_insertion__", "false")
+ .equals("true")
+ // Currently, the velox backend only supports bucketed tables compatible
with Hive and
+ // is limited to partitioned tables. Therefore, we should add this
condition restriction.
+ // After velox supports bucketed non-partitioned tables, we can remove
the restriction on
+ // partitioned tables.
+ if (bucketSpec.isEmpty || (isHiveCompatibleBucketTable &&
isPartitionedTable)) {
None
+ } else {
+ Some("Unsupported native write: non-compatible hive bucket write is
not supported.")
}
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
new file mode 100644
index 0000000000..a9fe8269e9
--- /dev/null
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/BucketWriteUtils.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{DataFrame, GlutenQueryTest}
+import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, Expression,
HiveHash, Literal, Pmod, UnsafeProjection}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SQLTestUtils
+
+import java.io.File
+
+trait BucketWriteUtils extends GlutenQueryTest with SQLTestUtils {
+
+ def tableDir(table: String): File = {
+ val identifier = spark.sessionState.sqlParser.parseTableIdentifier(table)
+ new File(spark.sessionState.catalog.defaultTablePath(identifier))
+ }
+
+ protected def testBucketing(
+ dataDir: File,
+ source: String = "parquet",
+ numBuckets: Int,
+ bucketCols: Seq[String],
+ sortCols: Seq[String] = Nil,
+ inputDF: DataFrame,
+ bucketIdExpression: (Seq[Expression], Int) => Expression,
+ getBucketIdFromFileName: String => Option[Int]): Unit = {
+ val allBucketFiles =
+ dataDir.listFiles().filterNot(f => f.getName.startsWith(".") ||
f.getName.startsWith("_"))
+
+ for (bucketFile <- allBucketFiles) {
+ val bucketId = getBucketIdFromFileName(bucketFile.getName).getOrElse {
+ fail(s"Unable to find the related bucket files.")
+ }
+
+ // Remove the duplicate columns in bucketCols and sortCols;
+ // Otherwise, we got analysis errors due to duplicate names
+ val selectedColumns = (bucketCols ++ sortCols).distinct
+ // We may lose the type information after write(e.g. json format doesn't
keep schema
+ // information), here we get the types from the original dataframe.
+ val types = inputDF.select(selectedColumns.map(col):
_*).schema.map(_.dataType)
+ val columns = selectedColumns.zip(types).map { case (colName, dt) =>
col(colName).cast(dt) }
+
+ // Read the bucket file into a dataframe, so that it's easier to test.
+ val readBack = spark.read
+ .format(source)
+ .load(bucketFile.getAbsolutePath)
+ .select(columns: _*)
+
+ // If we specified sort columns while writing bucket table, make sure
the data in this
+ // bucket file is already sorted.
+ if (sortCols.nonEmpty) {
+ checkAnswer(readBack.sort(sortCols.map(col): _*), readBack.collect())
+ }
+
+ // Go through all rows in this bucket file, calculate bucket id
according to bucket column
+ // values, and make sure it equals to the expected bucket id that
inferred from file name.
+ val qe = readBack.select(bucketCols.map(col): _*).queryExecution
+ val rows = qe.toRdd.map(_.copy()).collect()
+ val getBucketId = UnsafeProjection.create(
+ bucketIdExpression(qe.analyzed.output, numBuckets) :: Nil,
+ qe.analyzed.output)
+
+ for (row <- rows) {
+ val actualBucketId = getBucketId(row).getInt(0)
+ assert(actualBucketId == bucketId)
+ }
+ }
+ }
+
+ def bucketIdExpression(expressions: Seq[Expression], numBuckets: Int):
Expression =
+ Pmod(BitwiseAnd(HiveHash(expressions), Literal(Int.MaxValue)),
Literal(numBuckets))
+
+ def getBucketIdFromFileName(fileName: String): Option[Int] = {
+ val hiveBucketedFileName = """^(\d+)_0_.*$""".r
+ fileName match {
+ case hiveBucketedFileName(bucketId) => Some(bucketId.toInt)
+ case _ => None
+ }
+ }
+}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 5932f4e5a7..11efdccfcf 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -20,6 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.internal.config
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.{GlutenQueryTest, Row, SparkSession}
+import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
@@ -33,8 +34,14 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
-class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
+import java.io.File
+
+class VeloxParquetWriteForHiveSuite
+ extends GlutenQueryTest
+ with SQLTestUtils
+ with BucketWriteUtils {
private var _spark: SparkSession = _
+ import testImplicits._
override protected def beforeAll(): Unit = {
super.beforeAll()
@@ -222,4 +229,105 @@ class VeloxParquetWriteForHiveSuite extends
GlutenQueryTest with SQLTestUtils {
}
}
}
+
+ test("Native writer support compatible hive bucket write with dynamic
partition") {
+ if (isSparkVersionGE("3.4")) {
+ Seq("true", "false").foreach {
+ enableConvertMetastore =>
+ withSQLConf("spark.sql.hive.convertMetastoreParquet" ->
enableConvertMetastore) {
+ val source = "hive_source_table"
+ val target = "hive_bucketed_table"
+ withTable(source, target) {
+ sql(s"""
+ |CREATE TABLE IF NOT EXISTS $target (i int, j string)
+ |PARTITIONED BY(k string)
+ |CLUSTERED BY (i, j) SORTED BY (i) INTO 8 BUCKETS
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ val df =
+ (0 until 50).map(i => (i % 13, i.toString, i % 5)).toDF("i",
"j", "k")
+ df.write.mode(SaveMode.Overwrite).saveAsTable(source)
+
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ checkNativeWrite(s"INSERT INTO $target SELECT * FROM $source",
checkNative = true)
+ }
+
+ for (k <- 0 until 5) {
+ testBucketing(
+ new File(tableDir(target), s"k=$k"),
+ "parquet",
+ 8,
+ Seq("i", "j"),
+ Seq("i"),
+ df,
+ bucketIdExpression,
+ getBucketIdFromFileName)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("bucket writer with non-dynamic partition should fallback") {
+ if (isSparkVersionGE("3.4")) {
+ Seq("true", "false").foreach {
+ enableConvertMetastore =>
+ withSQLConf("spark.sql.hive.convertMetastoreParquet" ->
enableConvertMetastore) {
+ val source = "hive_source_table"
+ val target = "hive_bucketed_table"
+ withTable(source, target) {
+ sql(s"""
+ |CREATE TABLE IF NOT EXISTS $target (i int, j string)
+ |PARTITIONED BY(k string)
+ |CLUSTERED BY (i, j) SORTED BY (i) INTO 8 BUCKETS
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ val df =
+ (0 until 50).map(i => (i % 13, i.toString, i % 5)).toDF("i",
"j", "k")
+ df.write.mode(SaveMode.Overwrite).saveAsTable(source)
+
+ // hive relation convert always use dynamic, so it will offload
to native.
+ checkNativeWrite(
+ s"INSERT INTO $target PARTITION(k='0') SELECT i, j FROM
$source",
+ checkNative = enableConvertMetastore.toBoolean)
+ val files = tableDir(target)
+ .listFiles()
+ .filterNot(f => f.getName.startsWith(".") ||
f.getName.startsWith("_"))
+ assert(files.length == 1 && files.head.getName.contains("k=0"))
+ checkAnswer(spark.table(target).select("i", "j"), df.select("i",
"j"))
+ }
+ }
+ }
+ }
+ }
+
+ test("bucket writer with non-partition table should fallback") {
+ if (isSparkVersionGE("3.4")) {
+ Seq("true", "false").foreach {
+ enableConvertMetastore =>
+ withSQLConf("spark.sql.hive.convertMetastoreParquet" ->
enableConvertMetastore) {
+ val source = "hive_source_table"
+ val target = "hive_bucketed_table"
+ withTable(source, target) {
+ sql(s"""
+ |CREATE TABLE IF NOT EXISTS $target (i int, j string)
+ |CLUSTERED BY (i, j) SORTED BY (i) INTO 8 BUCKETS
+ |STORED AS PARQUET
+ """.stripMargin)
+
+ val df =
+ (0 until 50).map(i => (i % 13, i.toString)).toDF("i", "j")
+ df.write.mode(SaveMode.Overwrite).saveAsTable(source)
+
+ checkNativeWrite(s"INSERT INTO $target SELECT i, j FROM
$source", checkNative = false)
+
+ checkAnswer(spark.table(target), df)
+ }
+ }
+ }
+ }
+ }
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index dab9837936..e01d2d8985 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -514,11 +514,12 @@ std::shared_ptr<connector::hive::LocationHandle>
makeLocationHandle(
const std::string& targetDirectory,
dwio::common::FileFormat fileFormat,
common::CompressionKind compression,
+ const bool& isBucketed,
const std::optional<std::string>& writeDirectory = std::nullopt,
const connector::hive::LocationHandle::TableType& tableType =
connector::hive::LocationHandle::TableType::kExisting) {
std::string targetFileName = "";
- if (fileFormat == dwio::common::FileFormat::PARQUET) {
+ if (fileFormat == dwio::common::FileFormat::PARQUET && !isBucketed) {
targetFileName = fmt::format("gluten-part-{}{}{}", makeUuid(),
compressionFileNameSuffix(compression), ".parquet");
}
return std::make_shared<connector::hive::LocationHandle>(
@@ -607,6 +608,35 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}
}
+ std::shared_ptr<connector::hive::HiveBucketProperty> bucketProperty =
nullptr;
+ if (writeRel.has_bucket_spec()) {
+ const auto& bucketSpec = writeRel.bucket_spec();
+ const auto& numBuckets = bucketSpec.num_buckets();
+
+ std::vector<std::string> bucketedBy;
+ for (const auto& name : bucketSpec.bucket_column_names()) {
+ bucketedBy.emplace_back(name);
+ }
+
+ std::vector<TypePtr> bucketedTypes;
+ bucketedTypes.reserve(bucketedBy.size());
+ std::vector<TypePtr> tableColumnTypes = inputType->children();
+ for (const auto& name : bucketedBy) {
+ auto it = std::find(tableColumnNames.begin(), tableColumnNames.end(),
name);
+ VELOX_CHECK(it != tableColumnNames.end(), "Invalid bucket {}", name);
+ std::size_t index = std::distance(tableColumnNames.begin(), it);
+ bucketedTypes.emplace_back(tableColumnTypes[index]);
+ }
+
+ std::vector<std::shared_ptr<const connector::hive::HiveSortingColumn>>
sortedBy;
+ for (const auto& name : bucketSpec.sort_column_names()) {
+
sortedBy.emplace_back(std::make_shared<connector::hive::HiveSortingColumn>(name,
core::SortOrder{true, true}));
+ }
+
+ bucketProperty = std::make_shared<connector::hive::HiveBucketProperty>(
+ connector::hive::HiveBucketProperty::Kind::kHiveCompatible,
numBuckets, bucketedBy, bucketedTypes, sortedBy);
+ }
+
std::string writePath;
if (writeFilesTempPath_.has_value()) {
writePath = writeFilesTempPath_.value();
@@ -652,8 +682,8 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
tableColumnNames, /*inputType->names() clolumn name is
different*/
inputType->children(),
partitionedKey,
- nullptr /*bucketProperty*/,
- makeLocationHandle(writePath, fileFormat, compressionCodec),
+ bucketProperty,
+ makeLocationHandle(writePath, fileFormat, compressionCodec,
bucketProperty != nullptr),
fileFormat,
compressionCodec)),
(!partitionedKey.empty()),
diff --git a/docs/developers/SubstraitModifications.md
b/docs/developers/SubstraitModifications.md
index 24a9c1a212..3db2b5869c 100644
--- a/docs/developers/SubstraitModifications.md
+++ b/docs/developers/SubstraitModifications.md
@@ -28,6 +28,7 @@ changed `Unbounded` in `WindowFunction` into
`Unbounded_Preceding` and `Unbounde
* Added `WriteRel`
([#3690](https://github.com/apache/incubator-gluten/pull/3690)).
* Added `TopNRel`
([#5409](https://github.com/apache/incubator-gluten/pull/5409)).
* Added `ref` field in window bound `Preceding` and `Following`
([#5626](https://github.com/apache/incubator-gluten/pull/5626)).
+* Added `BucketSpec` field in
`WriteRel`([#8386](https://github.com/apache/incubator-gluten/pull/8386))
## Modifications to type.proto
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/RelBuilder.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/RelBuilder.java
index 7d19311808..86b2735318 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/RelBuilder.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/RelBuilder.java
@@ -25,10 +25,7 @@ import
org.apache.gluten.substrait.extensions.AdvancedExtensionNode;
import org.apache.gluten.substrait.type.ColumnTypeNode;
import org.apache.gluten.substrait.type.TypeNode;
-import io.substrait.proto.CrossRel;
-import io.substrait.proto.JoinRel;
-import io.substrait.proto.SetRel;
-import io.substrait.proto.SortField;
+import io.substrait.proto.*;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import java.util.List;
@@ -191,10 +188,11 @@ public class RelBuilder {
List<String> names,
List<ColumnTypeNode> columnTypeNodes,
AdvancedExtensionNode extensionNode,
+ WriteRel.BucketSpec bucketSpec,
SubstraitContext context,
Long operatorId) {
context.registerRelToOperator(operatorId);
- return new WriteRelNode(input, types, names, columnTypeNodes,
extensionNode);
+ return new WriteRelNode(input, types, names, columnTypeNodes,
extensionNode, bucketSpec);
}
public static RelNode makeSortRel(
diff --git
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/WriteRelNode.java
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/WriteRelNode.java
index 45b4cd659e..74ffc8282c 100644
---
a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/WriteRelNode.java
+++
b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/WriteRelNode.java
@@ -21,10 +21,7 @@ import org.apache.gluten.substrait.type.ColumnTypeNode;
import org.apache.gluten.substrait.type.TypeNode;
import org.apache.gluten.utils.SubstraitUtil;
-import io.substrait.proto.NamedObjectWrite;
-import io.substrait.proto.NamedStruct;
-import io.substrait.proto.Rel;
-import io.substrait.proto.WriteRel;
+import io.substrait.proto.*;
import java.io.Serializable;
import java.util.ArrayList;
@@ -39,17 +36,21 @@ public class WriteRelNode implements RelNode, Serializable {
private final AdvancedExtensionNode extensionNode;
+ private final WriteRel.BucketSpec bucketSpec;
+
WriteRelNode(
RelNode input,
List<TypeNode> types,
List<String> names,
List<ColumnTypeNode> partitionColumnTypeNodes,
- AdvancedExtensionNode extensionNode) {
+ AdvancedExtensionNode extensionNode,
+ WriteRel.BucketSpec bucketSpec) {
this.input = input;
this.types.addAll(types);
this.names.addAll(names);
this.columnTypeNodes.addAll(partitionColumnTypeNodes);
this.extensionNode = extensionNode;
+ this.bucketSpec = bucketSpec;
}
@Override
@@ -68,6 +69,10 @@ public class WriteRelNode implements RelNode, Serializable {
nameObjectWriter.setAdvancedExtension(extensionNode.toProtobuf());
}
+ if (bucketSpec != null) {
+ writeBuilder.setBucketSpec(bucketSpec);
+ }
+
writeBuilder.setNamedTable(nameObjectWriter);
if (input != null) {
diff --git
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
index 0abb50b323..ca669c7639 100644
---
a/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
+++
b/gluten-substrait/src/main/resources/substrait/proto/substrait/algebra.proto
@@ -608,6 +608,9 @@ message WriteRel {
// Output mode determines what is the output of executing this rel
OutputMode output = 6;
+ // The bucket spec for the writer.
+ BucketSpec bucket_spec = 7;
+
enum WriteOp {
WRITE_OP_UNSPECIFIED = 0;
// The insert of new tuples in a table
@@ -631,6 +634,13 @@ message WriteRel {
// subplans in the body of the Rel input) and return those with anounter
PlanRel.relations.
OUTPUT_MODE_MODIFIED_TUPLES = 2;
}
+
+ // A container for bucketing information.
+ message BucketSpec {
+ int32 num_buckets = 1;
+ repeated string bucket_column_names = 2;
+ repeated string sort_column_names = 3;
+ }
}
// The hash equijoin join operator will build a hash table out of the right
input based on a set of join keys.
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 0c220ada64..04aa7fe7ca 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -52,6 +52,7 @@ trait BackendSettingsApi {
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
+ isPartitionedTable: Boolean,
options: Map[String, String]): ValidationResult =
ValidationResult.succeeded
def supportNativeWrite(fields: Array[StructField]): Boolean = true
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
index 1a95a96a55..3c1857236b 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, MetadataBuilder}
-import io.substrait.proto.NamedStruct
+import io.substrait.proto.{NamedStruct, WriteRel}
import org.apache.parquet.hadoop.ParquetOutputFormat
import java.util.Locale
@@ -107,12 +107,23 @@ case class WriteFilesExecTransformer(
ExtensionBuilder.makeAdvancedExtension(
SubstraitUtil.createEnhancement(originalInputAttributes))
}
+
+ val bucketSpecOption = bucketSpec.map {
+ bucketSpec =>
+ val builder = WriteRel.BucketSpec.newBuilder()
+ builder.setNumBuckets(bucketSpec.numBuckets)
+ bucketSpec.bucketColumnNames.foreach(builder.addBucketColumnNames)
+ bucketSpec.sortColumnNames.foreach(builder.addSortColumnNames)
+ builder.build()
+ }
+
RelBuilder.makeWriteRel(
input,
typeNodes,
nameList,
columnTypeNodes,
extensionNode,
+ bucketSpecOption.orNull,
context,
operatorId)
}
@@ -147,11 +158,13 @@ case class WriteFilesExecTransformer(
"complex data type with constant")
}
+ val childOutput = this.child.output.map(_.exprId)
val validationResult =
BackendsApiManager.getSettings.supportWriteFilesExec(
fileFormat,
finalChildOutput.toStructType.fields,
bucketSpec,
+ partitionColumns.exists(c => childOutput.contains(c.exprId)),
caseInsensitiveOptions)
if (!validationResult.ok()) {
return ValidationResult.failed("Unsupported native write: " +
validationResult.reason())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]