This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 66f4091be8 [VL] Spark 3.2 / Spark 3.3, V1 write: Dynamic partition 
write (#10183)
66f4091be8 is described below

commit 66f4091be8a3c733530b8c5037879a57520b64e2
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Jul 15 13:44:59 2025 +0800

    [VL] Spark 3.2 / Spark 3.3, V1 write: Dynamic partition write (#10183)
---
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   2 -
 .../datasources/velox/VeloxBlockStripes.java       |  20 ++--
 .../execution/VeloxParquetWriteForHiveSuite.scala  |  79 ++++++++++++-
 cpp/velox/jni/VeloxJniWrapper.cc                   | 125 +++++++++++++++------
 .../ColumnarCollapseTransformStages.scala          |   3 +-
 .../GlutenFormatWriterInjectsBase.scala            |  10 +-
 .../datasources/GlutenWriterColumnarRules.scala    |   2 +
 .../sql/execution/datasources/BlockStripes.java    |  10 +-
 .../execution/datasources/FileFormatWriter.scala   |  23 ++--
 .../execution/datasources/FileFormatWriter.scala   |  23 ++--
 10 files changed, 212 insertions(+), 85 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 5e07be2c31..f2bc63d1c5 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -514,8 +514,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
   override def skipNativeCtas(ctas: CreateDataSourceTableAsSelectCommand): 
Boolean = true
 
   override def skipNativeInsertInto(insertInto: 
InsertIntoHadoopFsRelationCommand): Boolean = {
-    insertInto.partitionColumns.nonEmpty &&
-    insertInto.staticPartitions.size < insertInto.partitionColumns.size ||
     insertInto.bucketSpec.nonEmpty
   }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
index f9848d4ab6..9aa0f513c9 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
@@ -27,40 +27,42 @@ import org.jetbrains.annotations.NotNull;
 import java.util.Iterator;
 
 public class VeloxBlockStripes extends BlockStripes {
-
-  private int index = 0;
-
   public VeloxBlockStripes(BlockStripes bs) {
     super(bs.originBlockAddress,
         bs.blockAddresses, bs.headingRowIndice, bs.originBlockNumColumns,
-        bs.rowBytes);
+        bs.headingRowBytes);
   }
 
   @Override
   public @NotNull Iterator<BlockStripe> iterator() {
     return new Iterator<BlockStripe>() {
+      private int index = 0;
 
       @Override
       public boolean hasNext() {
-        return index < 1;
+        return index < blockAddresses.length;
       }
 
       @Override
       public BlockStripe next() {
-        index += 1;
-        return new BlockStripe() {
+        final BlockStripe nextStripe = new BlockStripe() {
+          private final long blockAddress = blockAddresses[index];
+          private final byte[] headingRowByteArray = headingRowBytes[index];
+
           @Override
           public ColumnarBatch getColumnarBatch() {
-            return ColumnarBatches.create(blockAddresses[0]);
+            return ColumnarBatches.create(blockAddress);
           }
 
           @Override
           public InternalRow getHeadingRow() {
             UnsafeRow row = new UnsafeRow(originBlockNumColumns);
-            row.pointTo(rowBytes, rowBytes.length);
+            row.pointTo(headingRowByteArray, headingRowByteArray.length);
             return row;
           }
         };
+        index += 1;
+        return nextStripe;
       }
     };
   }
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 71b965cd29..2201457a72 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -130,6 +130,7 @@ class VeloxParquetWriteForHiveSuite
           checkNative = true)
       }
       checkAnswer(spark.table("t"), Row(3, 1, 2))
+      checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2")))
     }
   }
 
@@ -141,10 +142,84 @@ class VeloxParquetWriteForHiveSuite
       withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
         checkNativeWrite(
           "INSERT OVERWRITE TABLE t partition(c=1, d)" +
-            " SELECT 3 as e, 2 as e",
-          checkNative = false)
+            " SELECT 3 as e, 2 as d",
+          checkNative = true)
       }
       checkAnswer(spark.table("t"), Row(3, 1, 2))
+      checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2")))
+    }
+  }
+
+  test("test hive dynamic and static partition write table, multiple 
partitions") {
+    withTable("t") {
+      spark.sql(
+        "CREATE TABLE t (c int, d long, e long)" +
+          " STORED AS PARQUET partitioned by (c, d)")
+      withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
+        checkNativeWrite(
+          "INSERT OVERWRITE TABLE t partition(c=1, d)" +
+            " SELECT 3 as e, 2 as d" +
+            " UNION ALL" +
+            " SELECT 4 as e, 5 as d",
+          checkNative = true)
+      }
+      checkAnswer(spark.table("t"), Seq(Row(3, 1, 2), Row(4, 1, 5)))
+      checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2"), 
Row("c=1/d=5")))
+    }
+  }
+
+  test("test hive dynamic and static partition write table, multiple keys, 
multiple partitions") {
+    withTable("t") {
+      spark.sql(
+        "CREATE TABLE t (c int, d long, e long, f int)" +
+          " STORED AS PARQUET partitioned by (c, d, f)")
+      withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
+        checkNativeWrite(
+          "INSERT OVERWRITE TABLE t partition(c=1, d, f)" +
+            " SELECT 3 as e, 2 as d, 7 as f" + // Partition 0.
+            " UNION ALL" +
+            " SELECT 4 as e, 5 as d, 9 as f" + // Partition 1.
+            " UNION ALL" +
+            " SELECT 6 as e, 2 as d, 7 as f" + // Partition 0.
+            " UNION ALL" +
+            " SELECT 8 as e, 5 as d, 7 as f", // Partition 2.
+          checkNative = true
+        )
+      }
+      checkAnswer(
+        spark.table("t"),
+        Seq(Row(3, 1, 2, 7), Row(4, 1, 5, 9), Row(6, 1, 2, 7), Row(8, 1, 5, 
7)))
+      checkAnswer(
+        spark.sql("SHOW PARTITIONS t"),
+        Seq(Row("c=1/d=2/f=7"), Row("c=1/d=5/f=7"), Row("c=1/d=5/f=9")))
+    }
+  }
+
+  test(
+    "test hive dynamic and static partition write table multiple keys, 
multiple partitions, " +
+      "single batch to write") {
+    withTable("t") {
+      spark.sql(
+        "CREATE TABLE t (c int, d long, e long, f int)" +
+          " STORED AS PARQUET partitioned by (c, d, f)")
+      withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
+        // Use of VALUES will result in a single input batch for 
DynamicPartitionDataSingleWriter
+        // to test the sanity of #splitBlockByPartitionAndBucket.
+        checkNativeWrite(
+          "INSERT OVERWRITE TABLE t PARTITION (c=1, d, f) VALUES" +
+            " (3, 2, 7)," +
+            " (4, 5, 9)," +
+            " (6, 2, 7)," +
+            " (8, 5, 7)",
+          checkNative = true
+        )
+      }
+      checkAnswer(
+        spark.table("t"),
+        Seq(Row(3, 1, 2, 7), Row(4, 1, 5, 9), Row(6, 1, 2, 7), Row(8, 1, 5, 
7)))
+      checkAnswer(
+        spark.sql("SHOW PARTITIONS t"),
+        Seq(Row("c=1/d=2/f=7"), Row("c=1/d=5/f=7"), Row("c=1/d=5/f=9")))
     }
   }
 
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 951564554b..36a427be86 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -19,6 +19,8 @@
 
 #include <glog/logging.h>
 #include <jni/JniCommon.h>
+#include <velox/connectors/hive/PartitionIdGenerator.h>
+#include <velox/exec/OperatorUtils.h>
 
 #include <exception>
 #include "JniUdf.h"
@@ -72,7 +74,7 @@ jint JNI_OnLoad(JavaVM* vm, void*) {
 
   blockStripesClass =
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/spark/sql/execution/datasources/BlockStripes;");
-  blockStripesConstructor = env->GetMethodID(blockStripesClass, "<init>", 
"(J[J[II[B)V");
+  blockStripesConstructor = env->GetMethodID(blockStripesClass, "<init>", 
"(J[J[II[[B)V");
 
   DLOG(INFO) << "Loaded Velox backend.";
 
@@ -401,45 +403,104 @@ 
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_splitBlockByPartitio
     JNIEnv* env,
     jobject wrapper,
     jlong batchHandle,
-    jintArray partitionColIndice,
-    jboolean hasBucket,
-    jlong memoryManagerId) {
+    jintArray partitionColIndices,
+    jboolean hasBucket) {
   JNI_METHOD_START
-  auto ctx = gluten::getRuntime(env, wrapper);
-  auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
+
+  GLUTEN_CHECK(!hasBucket, "Bucketing not supported by 
splitBlockByPartitionAndBucket");
+
+  const auto ctx = gluten::getRuntime(env, wrapper);
+  const auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
+
+  auto partitionKeyArray = gluten::getIntArrayElementsSafe(env, 
partitionColIndices);
+  int numPartitionKeys = partitionKeyArray.length();
+  std::vector<uint32_t> partitionColIndicesVec;
+  for (int i = 0; i < numPartitionKeys; ++i) {
+    const auto partitionColumnIndex = partitionKeyArray.elems()[i];
+    GLUTEN_CHECK(partitionColumnIndex < batch->numColumns(), "Partition column 
index overflow");
+    partitionColIndicesVec.emplace_back(partitionColumnIndex);
+  }
 
   std::vector<int32_t> dataColIndicesVec;
-  {
-    auto partitionKeyArray = gluten::getIntArrayElementsSafe(env, 
partitionColIndice);
-    int numPartitionKeys = partitionKeyArray.length();
-    std::unordered_set<int32_t> partitionColIndiceVec;
-    for (int i = 0; i < numPartitionKeys; ++i) {
-      partitionColIndiceVec.emplace(partitionKeyArray.elems()[i]);
-    }
-    for (int i = 0; i < batch->numColumns(); ++i) {
-      if (partitionColIndiceVec.count(i) == 0) {
-        // The column is not a partition column. Add it to the data column 
vector.
-        dataColIndicesVec.emplace_back(i);
-      }
+  for (int i = 0; i < batch->numColumns(); ++i) {
+    if (std::find(partitionColIndicesVec.begin(), 
partitionColIndicesVec.end(), i) == partitionColIndicesVec.end()) {
+      // The column is not a partition column. Add it to the data column 
vector.
+      dataColIndicesVec.emplace_back(i);
     }
   }
 
-  auto result = batch->toUnsafeRow(0);
-  auto rowBytes = result.data();
-  auto newBatchHandle = ctx->saveObject(ctx->select(batch, dataColIndicesVec));
+  auto pool = 
dynamic_cast<VeloxMemoryManager*>(ctx->memoryManager())->getLeafMemoryPool();
+  const auto veloxBatch = VeloxColumnarBatch::from(pool.get(), batch);
+  const auto inputRowVector = veloxBatch->getRowVector();
+  const auto numRows = inputRowVector->size();
+
+  connector::hive::PartitionIdGenerator idGen{
+      asRowType(inputRowVector->type()), partitionColIndicesVec, 128, 
pool.get(), true};
+  raw_vector<uint64_t> partitionIds{};
+  idGen.run(inputRowVector, partitionIds);
+  GLUTEN_CHECK(partitionIds.size() == numRows, "Mismatched number of partition 
ids");
+  const auto numPartitions = static_cast<int32_t>(idGen.numPartitions());
+
+  std::vector<vector_size_t> partitionSizes(numPartitions);
+  std::vector<BufferPtr> partitionRows(numPartitions);
+  std::vector<vector_size_t*> rawPartitionRows(numPartitions);
+  std::fill(partitionSizes.begin(), partitionSizes.end(), 0);
+
+  for (auto row = 0; row < numRows; ++row) {
+    const auto partitionId = partitionIds[row];
+    ++partitionSizes[partitionId];
+  }
 
-  auto bytesSize = result.size();
-  jbyteArray bytesArray = env->NewByteArray(bytesSize);
-  env->SetByteArrayRegion(bytesArray, 0, bytesSize, 
reinterpret_cast<jbyte*>(rowBytes));
+  for (int partitionId = 0; partitionId < numPartitions; ++partitionId) {
+    partitionRows[partitionId] = allocateIndices(partitionSizes[partitionId], 
pool.get());
+    rawPartitionRows[partitionId] = 
partitionRows[partitionId]->asMutable<vector_size_t>();
+  }
 
-  jlongArray batchArray = env->NewLongArray(1);
-  long* cBatchArray = new long[1];
-  cBatchArray[0] = newBatchHandle;
-  env->SetLongArrayRegion(batchArray, 0, 1, cBatchArray);
-  delete[] cBatchArray;
+  std::vector<vector_size_t> partitionNextRowOffset(numPartitions);
+  std::fill(partitionNextRowOffset.begin(), partitionNextRowOffset.end(), 0);
+  for (auto row = 0; row < numRows; ++row) {
+    const auto partitionId = partitionIds[row];
+    rawPartitionRows[partitionId][partitionNextRowOffset[partitionId]] = row;
+    ++partitionNextRowOffset[partitionId];
+  }
+
+  jobjectArray partitionHeadingRowBytesArray = 
env->NewObjectArray(numPartitions, env->FindClass("[B"), nullptr);
+  std::vector<jlong> partitionBatchHandles(numPartitions);
+
+  for (int partitionId = 0; partitionId < numPartitions; ++partitionId) {
+    const vector_size_t partitionSize = partitionSizes[partitionId];
+    if (partitionSize == 0) {
+      continue;
+    }
+
+    const RowVectorPtr rowVector = partitionSize == inputRowVector->size()
+        ? inputRowVector
+        : exec::wrap(partitionSize, partitionRows[partitionId], 
inputRowVector);
+
+    const std::shared_ptr<VeloxColumnarBatch> partitionBatch = 
std::make_shared<VeloxColumnarBatch>(rowVector);
+    const std::shared_ptr<VeloxColumnarBatch> 
partitionBatchWithoutPartitionColumns =
+        partitionBatch->select(pool.get(), dataColIndicesVec);
+    partitionBatchHandles[partitionId] = 
ctx->saveObject(partitionBatchWithoutPartitionColumns);
+    const auto headingRow = partitionBatch->toUnsafeRow(0);
+    const auto headingRowBytes = headingRow.data();
+    const auto headingRowNumBytes = headingRow.size();
+
+    jbyteArray jHeadingRowBytes = env->NewByteArray(headingRowNumBytes);
+    env->SetByteArrayRegion(jHeadingRowBytes, 0, headingRowNumBytes, 
reinterpret_cast<const jbyte*>(headingRowBytes));
+    env->SetObjectArrayElement(partitionHeadingRowBytesArray, partitionId, 
jHeadingRowBytes);
+  }
+
+  jlongArray partitionBatchArray = env->NewLongArray(numPartitions);
+  env->SetLongArrayRegion(partitionBatchArray, 0, numPartitions, 
partitionBatchHandles.data());
 
   jobject blockStripes = env->NewObject(
-      blockStripesClass, blockStripesConstructor, batchHandle, batchArray, 
nullptr, batch->numColumns(), bytesArray);
+      blockStripesClass,
+      blockStripesConstructor,
+      batchHandle,
+      partitionBatchArray,
+      nullptr,
+      batch->numColumns(),
+      partitionHeadingRowBytesArray);
   return blockStripes;
   JNI_METHOD_END(nullptr)
 }
@@ -595,8 +656,8 @@ 
Java_org_apache_gluten_vectorized_UnifflePartitionWriterJniWrapper_createPartiti
 }
 
 JNIEXPORT jboolean JNICALL 
Java_org_apache_gluten_config_ConfigJniWrapper_isEnhancedFeaturesEnabled( // 
NOLINT
-      JNIEnv* env,
-      jclass) {
+    JNIEnv* env,
+    jclass) {
 #ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
   return true;
 #else
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index 82b75a053f..da9def0e9e 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -46,7 +46,6 @@ import scala.collection.JavaConverters._
  * would be transformed to `ValueStreamNode` at native side.
  */
 case class InputIteratorTransformer(child: SparkPlan) extends 
UnaryTransformSupport {
-  assert(child.isInstanceOf[ColumnarInputAdapter])
 
   @transient
   override lazy val metrics: Map[String, SQLMetric] =
@@ -68,10 +67,12 @@ case class InputIteratorTransformer(child: SparkPlan) 
extends UnaryTransformSupp
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
   override def doExecuteBroadcast[T](): Broadcast[T] = {
+    assert(child.isInstanceOf[ColumnarInputAdapter])
     child.doExecuteBroadcast()
   }
 
   override protected def doTransform(context: SubstraitContext): 
TransformContext = {
+    assert(child.isInstanceOf[ColumnarInputAdapter])
     val operatorId = context.nextOperatorId(nodeName)
     val readRel = RelBuilder.makeReadRelForInputIterator(child.output.asJava, 
context, operatorId)
     TransformContext(output, readRel)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
index 950d463160..53ee2855e6 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
@@ -17,9 +17,10 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.{ColumnarToCarrierRowExecBase, 
ProjectExecTransformer, SortExecTransformer, TransformSupport, 
WholeStageTransformer}
+import org.apache.gluten.execution._
 import org.apache.gluten.execution.datasource.GlutenFormatWriterInjects
 import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.transition.{InsertTransitions, 
RemoveTransitions}
 
 import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
SparkPlan}
 import 
org.apache.spark.sql.execution.ColumnarCollapseTransformStages.transformStageCounter
@@ -40,8 +41,9 @@ trait GlutenFormatWriterInjectsBase extends 
GlutenFormatWriterInjects {
       return plan
     }
 
+    val transitionsRemoved = RemoveTransitions.apply(plan)
     // FIXME: HeuristicTransform is costly. Re-applying it may cause 
performance issues.
-    val transformed = transform(plan)
+    val transformed = transform(transitionsRemoved)
 
     if (!transformed.isInstanceOf[TransformSupport]) {
       throw new IllegalStateException(
@@ -65,6 +67,8 @@ trait GlutenFormatWriterInjectsBase extends 
GlutenFormatWriterInjects {
     val transformedWithAdapter = injectAdapter(transformed)
     val wst = WholeStageTransformer(transformedWithAdapter, materializeInput = 
true)(
       transformStageCounter.incrementAndGet())
-    BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToCarrierRow(wst)
+    val wstWithTransitions = 
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToCarrierRow(
+      InsertTransitions.create(outputsColumnar = true, 
wst.batchType()).apply(wst))
+    wstWithTransitions
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 3c2fb53643..55c4438fda 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -127,6 +127,8 @@ object GlutenWriterColumnarRules {
     if (format.isDefined) {
       spark.sparkContext.setLocalProperty("isNativeApplicable", true.toString)
       spark.sparkContext.setLocalProperty("nativeFormat", format.get)
+      // The flag for static-only write is not used by Velox backend where
+      // the static partition write is already supported.
       spark.sparkContext.setLocalProperty(
         "staticPartitionWriteOnly",
         BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
diff --git 
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/BlockStripes.java
 
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/BlockStripes.java
index 84a9c15824..3c4a7d4746 100644
--- 
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/BlockStripes.java
+++ 
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/BlockStripes.java
@@ -20,12 +20,14 @@ import org.jetbrains.annotations.NotNull;
 
 import java.util.Iterator;
 
+// FIXME: The abstraction is broken: VL / CH don't rely on the same binary 
layout of
+//  this class.
 public class BlockStripes implements Iterable<BlockStripe> {
     public long originBlockAddress;
     public long[] blockAddresses;
-    public int[] headingRowIndice;
+    public int[] headingRowIndice; // Only used by CH backend.
     public int originBlockNumColumns;
-    public byte[] rowBytes;
+    public byte[][] headingRowBytes; // Only used by Velox backend.
 
     public BlockStripes(
             long originBlockAddress,
@@ -43,12 +45,12 @@ public class BlockStripes implements Iterable<BlockStripe> {
         long[] blockAddresses,
         int[] headingRowIndice,
         int originBlockNumColumns,
-        byte[] rowBytes) {
+        byte[][] headingRowBytes) {
         this.originBlockAddress = originBlockAddress;
         this.blockAddresses = blockAddresses;
         this.headingRowIndice = headingRowIndice;
         this.originBlockNumColumns = originBlockNumColumns;
-        this.rowBytes = rowBytes;
+        this.headingRowBytes = headingRowBytes;
     }
 
     public void release() {
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 3ea51b5d1d..1c7ab83209 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -148,8 +148,6 @@ object FileFormatWriter extends Logging {
 
     val nativeEnabled =
       "true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
-    val staticPartitionWriteOnly =
-      "true" == 
sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")
 
     if (nativeEnabled) {
       logInfo(
@@ -174,12 +172,10 @@ object FileFormatWriter extends Logging {
       case attr => attr
     }
 
-    val empty2NullPlan = if (staticPartitionWriteOnly && nativeEnabled) {
-      // Velox backend only support static partition write.
-      // And no need to add sort operator for static partition write.
-      plan
+    val empty2NullPlan = if (needConvert) {
+      ProjectExec(projectList, plan)
     } else {
-      if (needConvert) ProjectExec(projectList, plan) else plan
+      plan
     }
 
     val bucketIdExpression = bucketSpec.map {
@@ -263,7 +259,7 @@ object FileFormatWriter extends Logging {
 
     try {
       val (finalPlan, concurrentOutputWriterSpec) = if (orderingMatched) {
-        if (!nativeEnabled || (staticPartitionWriteOnly && nativeEnabled)) {
+        if (!nativeEnabled) {
           (empty2NullPlan, None)
         } else {
           nativeWrap(empty2NullPlan)
@@ -291,15 +287,10 @@ object FileFormatWriter extends Logging {
             empty2NullPlan,
             Some(ConcurrentOutputWriterSpec(maxWriters, () => 
sortPlan.createSorter())))
         } else {
-          if (staticPartitionWriteOnly && nativeEnabled) {
-            // remove the sort operator for static partition write.
-            (empty2NullPlan, None)
+          if (!nativeEnabled) {
+            (sortPlan, None)
           } else {
-            if (!nativeEnabled) {
-              (sortPlan, None)
-            } else {
-              nativeWrap(sortPlan)
-            }
+            nativeWrap(sortPlan)
           }
         }
       }
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index d88eb43e8e..c037305f2e 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -140,8 +140,6 @@ object FileFormatWriter extends Logging {
 
     val nativeEnabled =
       "true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
-    val staticPartitionWriteOnly =
-      "true" == 
sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")
 
     if (nativeEnabled) {
       logInfo(
@@ -170,12 +168,10 @@ object FileFormatWriter extends Logging {
       case attr => attr
     }
 
-    val empty2NullPlan = if (staticPartitionWriteOnly && nativeEnabled) {
-      // Velox backend only support static partition write.
-      // And no need to add sort operator for static partition write.
-      plan
+    val empty2NullPlan = if (needConvert) {
+      ProjectExec(projectList, plan)
     } else {
-      if (needConvert) ProjectExec(projectList, plan) else plan
+      plan
     }
 
     val writerBucketSpec = bucketSpec.map {
@@ -283,7 +279,7 @@ object FileFormatWriter extends Logging {
 
     try {
       val (finalPlan, concurrentOutputWriterSpec) = if (orderingMatched) {
-        if (!nativeEnabled || (staticPartitionWriteOnly && nativeEnabled)) {
+        if (!nativeEnabled) {
           (empty2NullPlan, None)
         } else {
           nativeWrap(empty2NullPlan)
@@ -311,15 +307,10 @@ object FileFormatWriter extends Logging {
             empty2NullPlan,
             Some(ConcurrentOutputWriterSpec(maxWriters, () => 
sortPlan.createSorter())))
         } else {
-          if (staticPartitionWriteOnly && nativeEnabled) {
-            // remove the sort operator for static partition write.
-            (empty2NullPlan, None)
+          if (!nativeEnabled) {
+            (sortPlan, None)
           } else {
-            if (!nativeEnabled) {
-              (sortPlan, None)
-            } else {
-              nativeWrap(sortPlan)
-            }
+            nativeWrap(sortPlan)
           }
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to