This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 66f4091be8 [VL] Spark 3.2 / Spark 3.3, V1 write: Dynamic partition
write (#10183)
66f4091be8 is described below
commit 66f4091be8a3c733530b8c5037879a57520b64e2
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Jul 15 13:44:59 2025 +0800
[VL] Spark 3.2 / Spark 3.3, V1 write: Dynamic partition write (#10183)
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 2 -
.../datasources/velox/VeloxBlockStripes.java | 20 ++--
.../execution/VeloxParquetWriteForHiveSuite.scala | 79 ++++++++++++-
cpp/velox/jni/VeloxJniWrapper.cc | 125 +++++++++++++++------
.../ColumnarCollapseTransformStages.scala | 3 +-
.../GlutenFormatWriterInjectsBase.scala | 10 +-
.../datasources/GlutenWriterColumnarRules.scala | 2 +
.../sql/execution/datasources/BlockStripes.java | 10 +-
.../execution/datasources/FileFormatWriter.scala | 23 ++--
.../execution/datasources/FileFormatWriter.scala | 23 ++--
10 files changed, 212 insertions(+), 85 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 5e07be2c31..f2bc63d1c5 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -514,8 +514,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def skipNativeCtas(ctas: CreateDataSourceTableAsSelectCommand):
Boolean = true
override def skipNativeInsertInto(insertInto:
InsertIntoHadoopFsRelationCommand): Boolean = {
- insertInto.partitionColumns.nonEmpty &&
- insertInto.staticPartitions.size < insertInto.partitionColumns.size ||
insertInto.bucketSpec.nonEmpty
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
index f9848d4ab6..9aa0f513c9 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
@@ -27,40 +27,42 @@ import org.jetbrains.annotations.NotNull;
import java.util.Iterator;
public class VeloxBlockStripes extends BlockStripes {
-
- private int index = 0;
-
public VeloxBlockStripes(BlockStripes bs) {
super(bs.originBlockAddress,
bs.blockAddresses, bs.headingRowIndice, bs.originBlockNumColumns,
- bs.rowBytes);
+ bs.headingRowBytes);
}
@Override
public @NotNull Iterator<BlockStripe> iterator() {
return new Iterator<BlockStripe>() {
+ private int index = 0;
@Override
public boolean hasNext() {
- return index < 1;
+ return index < blockAddresses.length;
}
@Override
public BlockStripe next() {
- index += 1;
- return new BlockStripe() {
+ final BlockStripe nextStripe = new BlockStripe() {
+ private final long blockAddress = blockAddresses[index];
+ private final byte[] headingRowByteArray = headingRowBytes[index];
+
@Override
public ColumnarBatch getColumnarBatch() {
- return ColumnarBatches.create(blockAddresses[0]);
+ return ColumnarBatches.create(blockAddress);
}
@Override
public InternalRow getHeadingRow() {
UnsafeRow row = new UnsafeRow(originBlockNumColumns);
- row.pointTo(rowBytes, rowBytes.length);
+ row.pointTo(headingRowByteArray, headingRowByteArray.length);
return row;
}
};
+ index += 1;
+ return nextStripe;
}
};
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 71b965cd29..2201457a72 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -130,6 +130,7 @@ class VeloxParquetWriteForHiveSuite
checkNative = true)
}
checkAnswer(spark.table("t"), Row(3, 1, 2))
+ checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2")))
}
}
@@ -141,10 +142,84 @@ class VeloxParquetWriteForHiveSuite
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
checkNativeWrite(
"INSERT OVERWRITE TABLE t partition(c=1, d)" +
- " SELECT 3 as e, 2 as e",
- checkNative = false)
+ " SELECT 3 as e, 2 as d",
+ checkNative = true)
}
checkAnswer(spark.table("t"), Row(3, 1, 2))
+ checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2")))
+ }
+ }
+
+ test("test hive dynamic and static partition write table, multiple
partitions") {
+ withTable("t") {
+ spark.sql(
+ "CREATE TABLE t (c int, d long, e long)" +
+ " STORED AS PARQUET partitioned by (c, d)")
+ withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
+ checkNativeWrite(
+ "INSERT OVERWRITE TABLE t partition(c=1, d)" +
+ " SELECT 3 as e, 2 as d" +
+ " UNION ALL" +
+ " SELECT 4 as e, 5 as d",
+ checkNative = true)
+ }
+ checkAnswer(spark.table("t"), Seq(Row(3, 1, 2), Row(4, 1, 5)))
+ checkAnswer(spark.sql("SHOW PARTITIONS t"), Seq(Row("c=1/d=2"),
Row("c=1/d=5")))
+ }
+ }
+
+ test("test hive dynamic and static partition write table, multiple keys,
multiple partitions") {
+ withTable("t") {
+ spark.sql(
+ "CREATE TABLE t (c int, d long, e long, f int)" +
+ " STORED AS PARQUET partitioned by (c, d, f)")
+ withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
+ checkNativeWrite(
+ "INSERT OVERWRITE TABLE t partition(c=1, d, f)" +
+ " SELECT 3 as e, 2 as d, 7 as f" + // Partition 0.
+ " UNION ALL" +
+ " SELECT 4 as e, 5 as d, 9 as f" + // Partition 1.
+ " UNION ALL" +
+ " SELECT 6 as e, 2 as d, 7 as f" + // Partition 0.
+ " UNION ALL" +
+ " SELECT 8 as e, 5 as d, 7 as f", // Partition 2.
+ checkNative = true
+ )
+ }
+ checkAnswer(
+ spark.table("t"),
+ Seq(Row(3, 1, 2, 7), Row(4, 1, 5, 9), Row(6, 1, 2, 7), Row(8, 1, 5,
7)))
+ checkAnswer(
+ spark.sql("SHOW PARTITIONS t"),
+ Seq(Row("c=1/d=2/f=7"), Row("c=1/d=5/f=7"), Row("c=1/d=5/f=9")))
+ }
+ }
+
+ test(
+ "test hive dynamic and static partition write table multiple keys,
multiple partitions, " +
+ "single batch to write") {
+ withTable("t") {
+ spark.sql(
+ "CREATE TABLE t (c int, d long, e long, f int)" +
+ " STORED AS PARQUET partitioned by (c, d, f)")
+ withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
+ // Use of VALUES will result in a single input batch for
DynamicPartitionDataSingleWriter
+ // to test the sanity of #splitBlockByPartitionAndBucket.
+ checkNativeWrite(
+ "INSERT OVERWRITE TABLE t PARTITION (c=1, d, f) VALUES" +
+ " (3, 2, 7)," +
+ " (4, 5, 9)," +
+ " (6, 2, 7)," +
+ " (8, 5, 7)",
+ checkNative = true
+ )
+ }
+ checkAnswer(
+ spark.table("t"),
+ Seq(Row(3, 1, 2, 7), Row(4, 1, 5, 9), Row(6, 1, 2, 7), Row(8, 1, 5,
7)))
+ checkAnswer(
+ spark.sql("SHOW PARTITIONS t"),
+ Seq(Row("c=1/d=2/f=7"), Row("c=1/d=5/f=7"), Row("c=1/d=5/f=9")))
}
}
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 951564554b..36a427be86 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -19,6 +19,8 @@
#include <glog/logging.h>
#include <jni/JniCommon.h>
+#include <velox/connectors/hive/PartitionIdGenerator.h>
+#include <velox/exec/OperatorUtils.h>
#include <exception>
#include "JniUdf.h"
@@ -72,7 +74,7 @@ jint JNI_OnLoad(JavaVM* vm, void*) {
blockStripesClass =
createGlobalClassReferenceOrError(env,
"Lorg/apache/spark/sql/execution/datasources/BlockStripes;");
- blockStripesConstructor = env->GetMethodID(blockStripesClass, "<init>",
"(J[J[II[B)V");
+ blockStripesConstructor = env->GetMethodID(blockStripesClass, "<init>",
"(J[J[II[[B)V");
DLOG(INFO) << "Loaded Velox backend.";
@@ -401,45 +403,104 @@
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_splitBlockByPartitio
JNIEnv* env,
jobject wrapper,
jlong batchHandle,
- jintArray partitionColIndice,
- jboolean hasBucket,
- jlong memoryManagerId) {
+ jintArray partitionColIndices,
+ jboolean hasBucket) {
JNI_METHOD_START
- auto ctx = gluten::getRuntime(env, wrapper);
- auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
+
+ GLUTEN_CHECK(!hasBucket, "Bucketing not supported by
splitBlockByPartitionAndBucket");
+
+ const auto ctx = gluten::getRuntime(env, wrapper);
+ const auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
+
+ auto partitionKeyArray = gluten::getIntArrayElementsSafe(env,
partitionColIndices);
+ int numPartitionKeys = partitionKeyArray.length();
+ std::vector<uint32_t> partitionColIndicesVec;
+ for (int i = 0; i < numPartitionKeys; ++i) {
+ const auto partitionColumnIndex = partitionKeyArray.elems()[i];
+ GLUTEN_CHECK(partitionColumnIndex < batch->numColumns(), "Partition column
index overflow");
+ partitionColIndicesVec.emplace_back(partitionColumnIndex);
+ }
std::vector<int32_t> dataColIndicesVec;
- {
- auto partitionKeyArray = gluten::getIntArrayElementsSafe(env,
partitionColIndice);
- int numPartitionKeys = partitionKeyArray.length();
- std::unordered_set<int32_t> partitionColIndiceVec;
- for (int i = 0; i < numPartitionKeys; ++i) {
- partitionColIndiceVec.emplace(partitionKeyArray.elems()[i]);
- }
- for (int i = 0; i < batch->numColumns(); ++i) {
- if (partitionColIndiceVec.count(i) == 0) {
- // The column is not a partition column. Add it to the data column
vector.
- dataColIndicesVec.emplace_back(i);
- }
+ for (int i = 0; i < batch->numColumns(); ++i) {
+ if (std::find(partitionColIndicesVec.begin(),
partitionColIndicesVec.end(), i) == partitionColIndicesVec.end()) {
+ // The column is not a partition column. Add it to the data column
vector.
+ dataColIndicesVec.emplace_back(i);
}
}
- auto result = batch->toUnsafeRow(0);
- auto rowBytes = result.data();
- auto newBatchHandle = ctx->saveObject(ctx->select(batch, dataColIndicesVec));
+ auto pool =
dynamic_cast<VeloxMemoryManager*>(ctx->memoryManager())->getLeafMemoryPool();
+ const auto veloxBatch = VeloxColumnarBatch::from(pool.get(), batch);
+ const auto inputRowVector = veloxBatch->getRowVector();
+ const auto numRows = inputRowVector->size();
+
+ connector::hive::PartitionIdGenerator idGen{
+ asRowType(inputRowVector->type()), partitionColIndicesVec, 128,
pool.get(), true};
+ raw_vector<uint64_t> partitionIds{};
+ idGen.run(inputRowVector, partitionIds);
+ GLUTEN_CHECK(partitionIds.size() == numRows, "Mismatched number of partition
ids");
+ const auto numPartitions = static_cast<int32_t>(idGen.numPartitions());
+
+ std::vector<vector_size_t> partitionSizes(numPartitions);
+ std::vector<BufferPtr> partitionRows(numPartitions);
+ std::vector<vector_size_t*> rawPartitionRows(numPartitions);
+ std::fill(partitionSizes.begin(), partitionSizes.end(), 0);
+
+ for (auto row = 0; row < numRows; ++row) {
+ const auto partitionId = partitionIds[row];
+ ++partitionSizes[partitionId];
+ }
- auto bytesSize = result.size();
- jbyteArray bytesArray = env->NewByteArray(bytesSize);
- env->SetByteArrayRegion(bytesArray, 0, bytesSize,
reinterpret_cast<jbyte*>(rowBytes));
+ for (int partitionId = 0; partitionId < numPartitions; ++partitionId) {
+ partitionRows[partitionId] = allocateIndices(partitionSizes[partitionId],
pool.get());
+ rawPartitionRows[partitionId] =
partitionRows[partitionId]->asMutable<vector_size_t>();
+ }
- jlongArray batchArray = env->NewLongArray(1);
- long* cBatchArray = new long[1];
- cBatchArray[0] = newBatchHandle;
- env->SetLongArrayRegion(batchArray, 0, 1, cBatchArray);
- delete[] cBatchArray;
+ std::vector<vector_size_t> partitionNextRowOffset(numPartitions);
+ std::fill(partitionNextRowOffset.begin(), partitionNextRowOffset.end(), 0);
+ for (auto row = 0; row < numRows; ++row) {
+ const auto partitionId = partitionIds[row];
+ rawPartitionRows[partitionId][partitionNextRowOffset[partitionId]] = row;
+ ++partitionNextRowOffset[partitionId];
+ }
+
+ jobjectArray partitionHeadingRowBytesArray =
env->NewObjectArray(numPartitions, env->FindClass("[B"), nullptr);
+ std::vector<jlong> partitionBatchHandles(numPartitions);
+
+ for (int partitionId = 0; partitionId < numPartitions; ++partitionId) {
+ const vector_size_t partitionSize = partitionSizes[partitionId];
+ if (partitionSize == 0) {
+ continue;
+ }
+
+ const RowVectorPtr rowVector = partitionSize == inputRowVector->size()
+ ? inputRowVector
+ : exec::wrap(partitionSize, partitionRows[partitionId],
inputRowVector);
+
+ const std::shared_ptr<VeloxColumnarBatch> partitionBatch =
std::make_shared<VeloxColumnarBatch>(rowVector);
+ const std::shared_ptr<VeloxColumnarBatch>
partitionBatchWithoutPartitionColumns =
+ partitionBatch->select(pool.get(), dataColIndicesVec);
+ partitionBatchHandles[partitionId] =
ctx->saveObject(partitionBatchWithoutPartitionColumns);
+ const auto headingRow = partitionBatch->toUnsafeRow(0);
+ const auto headingRowBytes = headingRow.data();
+ const auto headingRowNumBytes = headingRow.size();
+
+ jbyteArray jHeadingRowBytes = env->NewByteArray(headingRowNumBytes);
+ env->SetByteArrayRegion(jHeadingRowBytes, 0, headingRowNumBytes,
reinterpret_cast<const jbyte*>(headingRowBytes));
+ env->SetObjectArrayElement(partitionHeadingRowBytesArray, partitionId,
jHeadingRowBytes);
+ }
+
+ jlongArray partitionBatchArray = env->NewLongArray(numPartitions);
+ env->SetLongArrayRegion(partitionBatchArray, 0, numPartitions,
partitionBatchHandles.data());
jobject blockStripes = env->NewObject(
- blockStripesClass, blockStripesConstructor, batchHandle, batchArray,
nullptr, batch->numColumns(), bytesArray);
+ blockStripesClass,
+ blockStripesConstructor,
+ batchHandle,
+ partitionBatchArray,
+ nullptr,
+ batch->numColumns(),
+ partitionHeadingRowBytesArray);
return blockStripes;
JNI_METHOD_END(nullptr)
}
@@ -595,8 +656,8 @@
Java_org_apache_gluten_vectorized_UnifflePartitionWriterJniWrapper_createPartiti
}
JNIEXPORT jboolean JNICALL
Java_org_apache_gluten_config_ConfigJniWrapper_isEnhancedFeaturesEnabled( //
NOLINT
- JNIEnv* env,
- jclass) {
+ JNIEnv* env,
+ jclass) {
#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
return true;
#else
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index 82b75a053f..da9def0e9e 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -46,7 +46,6 @@ import scala.collection.JavaConverters._
* would be transformed to `ValueStreamNode` at native side.
*/
case class InputIteratorTransformer(child: SparkPlan) extends
UnaryTransformSupport {
- assert(child.isInstanceOf[ColumnarInputAdapter])
@transient
override lazy val metrics: Map[String, SQLMetric] =
@@ -68,10 +67,12 @@ case class InputIteratorTransformer(child: SparkPlan)
extends UnaryTransformSupp
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def doExecuteBroadcast[T](): Broadcast[T] = {
+ assert(child.isInstanceOf[ColumnarInputAdapter])
child.doExecuteBroadcast()
}
override protected def doTransform(context: SubstraitContext):
TransformContext = {
+ assert(child.isInstanceOf[ColumnarInputAdapter])
val operatorId = context.nextOperatorId(nodeName)
val readRel = RelBuilder.makeReadRelForInputIterator(child.output.asJava,
context, operatorId)
TransformContext(output, readRel)
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
index 950d463160..53ee2855e6 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
@@ -17,9 +17,10 @@
package org.apache.spark.sql.execution.datasources
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.{ColumnarToCarrierRowExecBase,
ProjectExecTransformer, SortExecTransformer, TransformSupport,
WholeStageTransformer}
+import org.apache.gluten.execution._
import org.apache.gluten.execution.datasource.GlutenFormatWriterInjects
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.transition.{InsertTransitions,
RemoveTransitions}
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
SparkPlan}
import
org.apache.spark.sql.execution.ColumnarCollapseTransformStages.transformStageCounter
@@ -40,8 +41,9 @@ trait GlutenFormatWriterInjectsBase extends
GlutenFormatWriterInjects {
return plan
}
+ val transitionsRemoved = RemoveTransitions.apply(plan)
// FIXME: HeuristicTransform is costly. Re-applying it may cause
performance issues.
- val transformed = transform(plan)
+ val transformed = transform(transitionsRemoved)
if (!transformed.isInstanceOf[TransformSupport]) {
throw new IllegalStateException(
@@ -65,6 +67,8 @@ trait GlutenFormatWriterInjectsBase extends
GlutenFormatWriterInjects {
val transformedWithAdapter = injectAdapter(transformed)
val wst = WholeStageTransformer(transformedWithAdapter, materializeInput =
true)(
transformStageCounter.incrementAndGet())
- BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToCarrierRow(wst)
+ val wstWithTransitions =
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToCarrierRow(
+ InsertTransitions.create(outputsColumnar = true,
wst.batchType()).apply(wst))
+ wstWithTransitions
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 3c2fb53643..55c4438fda 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -127,6 +127,8 @@ object GlutenWriterColumnarRules {
if (format.isDefined) {
spark.sparkContext.setLocalProperty("isNativeApplicable", true.toString)
spark.sparkContext.setLocalProperty("nativeFormat", format.get)
+ // The flag for static-only write is not used by Velox backend where
+ // the static partition write is already supported.
spark.sparkContext.setLocalProperty(
"staticPartitionWriteOnly",
BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
diff --git
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/BlockStripes.java
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/BlockStripes.java
index 84a9c15824..3c4a7d4746 100644
---
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/BlockStripes.java
+++
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/BlockStripes.java
@@ -20,12 +20,14 @@ import org.jetbrains.annotations.NotNull;
import java.util.Iterator;
+// FIXME: The abstraction is broken: VL / CH don't rely on the same binary
layout of
+// this class.
public class BlockStripes implements Iterable<BlockStripe> {
public long originBlockAddress;
public long[] blockAddresses;
- public int[] headingRowIndice;
+ public int[] headingRowIndice; // Only used by CH backend.
public int originBlockNumColumns;
- public byte[] rowBytes;
+ public byte[][] headingRowBytes; // Only used by Velox backend.
public BlockStripes(
long originBlockAddress,
@@ -43,12 +45,12 @@ public class BlockStripes implements Iterable<BlockStripe> {
long[] blockAddresses,
int[] headingRowIndice,
int originBlockNumColumns,
- byte[] rowBytes) {
+ byte[][] headingRowBytes) {
this.originBlockAddress = originBlockAddress;
this.blockAddresses = blockAddresses;
this.headingRowIndice = headingRowIndice;
this.originBlockNumColumns = originBlockNumColumns;
- this.rowBytes = rowBytes;
+ this.headingRowBytes = headingRowBytes;
}
public void release() {
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 3ea51b5d1d..1c7ab83209 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -148,8 +148,6 @@ object FileFormatWriter extends Logging {
val nativeEnabled =
"true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
- val staticPartitionWriteOnly =
- "true" ==
sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")
if (nativeEnabled) {
logInfo(
@@ -174,12 +172,10 @@ object FileFormatWriter extends Logging {
case attr => attr
}
- val empty2NullPlan = if (staticPartitionWriteOnly && nativeEnabled) {
- // Velox backend only support static partition write.
- // And no need to add sort operator for static partition write.
- plan
+ val empty2NullPlan = if (needConvert) {
+ ProjectExec(projectList, plan)
} else {
- if (needConvert) ProjectExec(projectList, plan) else plan
+ plan
}
val bucketIdExpression = bucketSpec.map {
@@ -263,7 +259,7 @@ object FileFormatWriter extends Logging {
try {
val (finalPlan, concurrentOutputWriterSpec) = if (orderingMatched) {
- if (!nativeEnabled || (staticPartitionWriteOnly && nativeEnabled)) {
+ if (!nativeEnabled) {
(empty2NullPlan, None)
} else {
nativeWrap(empty2NullPlan)
@@ -291,15 +287,10 @@ object FileFormatWriter extends Logging {
empty2NullPlan,
Some(ConcurrentOutputWriterSpec(maxWriters, () =>
sortPlan.createSorter())))
} else {
- if (staticPartitionWriteOnly && nativeEnabled) {
- // remove the sort operator for static partition write.
- (empty2NullPlan, None)
+ if (!nativeEnabled) {
+ (sortPlan, None)
} else {
- if (!nativeEnabled) {
- (sortPlan, None)
- } else {
- nativeWrap(sortPlan)
- }
+ nativeWrap(sortPlan)
}
}
}
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index d88eb43e8e..c037305f2e 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -140,8 +140,6 @@ object FileFormatWriter extends Logging {
val nativeEnabled =
"true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")
- val staticPartitionWriteOnly =
- "true" ==
sparkSession.sparkContext.getLocalProperty("staticPartitionWriteOnly")
if (nativeEnabled) {
logInfo(
@@ -170,12 +168,10 @@ object FileFormatWriter extends Logging {
case attr => attr
}
- val empty2NullPlan = if (staticPartitionWriteOnly && nativeEnabled) {
- // Velox backend only support static partition write.
- // And no need to add sort operator for static partition write.
- plan
+ val empty2NullPlan = if (needConvert) {
+ ProjectExec(projectList, plan)
} else {
- if (needConvert) ProjectExec(projectList, plan) else plan
+ plan
}
val writerBucketSpec = bucketSpec.map {
@@ -283,7 +279,7 @@ object FileFormatWriter extends Logging {
try {
val (finalPlan, concurrentOutputWriterSpec) = if (orderingMatched) {
- if (!nativeEnabled || (staticPartitionWriteOnly && nativeEnabled)) {
+ if (!nativeEnabled) {
(empty2NullPlan, None)
} else {
nativeWrap(empty2NullPlan)
@@ -311,15 +307,10 @@ object FileFormatWriter extends Logging {
empty2NullPlan,
Some(ConcurrentOutputWriterSpec(maxWriters, () =>
sortPlan.createSorter())))
} else {
- if (staticPartitionWriteOnly && nativeEnabled) {
- // remove the sort operator for static partition write.
- (empty2NullPlan, None)
+ if (!nativeEnabled) {
+ (sortPlan, None)
} else {
- if (!nativeEnabled) {
- (sortPlan, None)
- } else {
- nativeWrap(sortPlan)
- }
+ nativeWrap(sortPlan)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]