This is an automated email from the ASF dual-hosted git repository.
zjuwangg pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 4fda85d13d [VL] Add default compress for VeloxColumnarBatchSerializer
to reduce memory usage (#12373)
4fda85d13d is described below
commit 4fda85d13d4897ae39fc8f2df4b517bb18940046
Author: Hao Chen <[email protected]>
AuthorDate: Tue Jun 30 18:48:38 2026 +0800
[VL] Add default compress for VeloxColumnarBatchSerializer to reduce memory
usage (#12373)
---
.../org/apache/gluten/config/VeloxConfig.scala | 9 ++++++
.../gluten/execution/VeloxHashJoinSuite.scala | 36 ++++++++++++++++++++++
cpp/velox/compute/VeloxRuntime.cc | 4 ++-
cpp/velox/config/VeloxConfig.h | 5 +++
.../serializer/VeloxColumnarBatchSerializer.cc | 7 +++--
.../serializer/VeloxColumnarBatchSerializer.h | 3 +-
.../org/apache/gluten/config/GlutenConfig.scala | 3 +-
7 files changed, 61 insertions(+), 6 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index 1f87ec000a..dcb79a462f 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -631,6 +631,15 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(false)
+ val COLUMNAR_VELOX_BATCH_SERIALIZER_COMPRESSION =
+
buildConf("spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression")
+ .internal()
+ .doc("which compression for the columnar batch serializer (e.g.
broadcast).")
+ .stringConf
+ .transform(_.toLowerCase(Locale.ROOT))
+ .checkValues(Set("none", "zstd", "zlib", "snappy", "lz4", "gzip"))
+ .createWithDefault("none")
+
val VELOX_HASHMAP_ABANDON_BUILD_DUPHASH_MIN_ROWS =
buildConf("spark.gluten.velox.abandonDedupHashMap.minRows")
.experimental()
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index a83e0ebf0e..c80950aa1f 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -445,4 +445,40 @@ class VeloxHashJoinSuite extends
VeloxWholeStageTransformerSuite {
}
}
}
+
+ test("test columnarBatchSerializerCompression") {
+ Seq("none", "zstd", "zlib", "snappy", "lz4", "gzip").foreach(
+ compression =>
+ withSQLConf(
+ GlutenConfig.GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key -> "16",
+ VeloxConfig.VELOX_BROADCAST_BUILD_RELATION_USE_OFFHEAP.key -> "true",
+ VeloxConfig.COLUMNAR_VELOX_BATCH_SERIALIZER_COMPRESSION.key ->
compression
+ ) {
+ withTable("t1", "t2") {
+ spark.sql("""
+ |CREATE TABLE t1 USING PARQUET
+ |AS SELECT id as c1, id as c2 FROM range(10)
+ |""".stripMargin)
+
+ spark.sql("""
+ |CREATE TABLE t2 USING PARQUET PARTITIONED BY (c1)
+ |AS SELECT id as c1, id as c2 FROM range(30)
+ |""".stripMargin)
+
+ val df = spark.sql("""
+ |SELECT t1.c2
+ |FROM t1, t2
+ |WHERE t1.c1 = t2.c1
+ |AND t1.c2 < 4
+ |""".stripMargin)
+
+ checkAnswer(df, Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil)
+
+ val subqueryBroadcastExecs =
collectWithSubqueries(df.queryExecution.executedPlan) {
+ case subqueryBroadcast: ColumnarSubqueryBroadcastExec =>
subqueryBroadcast
+ }
+ assert(subqueryBroadcastExecs.size == 1)
+ }
+ })
+ }
}
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index f13430bd0c..237048553a 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -629,7 +629,9 @@ std::unique_ptr<ColumnarBatchSerializer>
VeloxRuntime::createColumnarBatchSerial
return std::make_unique<VeloxGpuColumnarBatchSerializer>(arrowPool,
veloxPool, cSchema);
}
#endif
- return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool,
cSchema);
+ auto compressionKind =
+ veloxCfg_->get<std::string>(kColumnarBatchSerializerCompression,
kColumnarBatchSerializerCompressionDefault);
+ return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool,
cSchema, compressionKind);
}
void VeloxRuntime::enableDumping() {
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index d88c436193..ad51066f40 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -50,6 +50,11 @@ const std::string kSparkShuffleSpillCompress =
"spark.shuffle.spill.compress";
const std::string kCompressionKind = "spark.io.compression.codec";
/// The compression codec to use for spilling. Use kCompressionKind if not set.
const std::string kSpillCompressionKind =
"spark.gluten.sql.columnar.backend.velox.spillCompressionCodec";
+
+// Which compression kind to use for the columnar batch serializer (e.g.
broadcast).
+const std::string kColumnarBatchSerializerCompression =
+
"spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression";
+const std::string kColumnarBatchSerializerCompressionDefault = "none";
const std::string kMaxPartialAggregationMemoryRatio =
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio";
const std::string kMaxPartialAggregationMemory =
"spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory";
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
index 4dbda85a8b..cc1cd6bb4e 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
@@ -27,6 +27,7 @@
#include "memory/ArrowMemory.h"
#include "memory/VeloxColumnarBatch.h"
+#include "velox/common/compression/Compression.h"
#include "velox/common/memory/Memory.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/LazyVector.h"
@@ -51,7 +52,8 @@ std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data,
int32_t size) {
VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
std::shared_ptr<memory::MemoryPool> veloxPool,
- struct ArrowSchema* cSchema)
+ struct ArrowSchema* cSchema,
+ const std::string& compressionKind)
: ColumnarBatchSerializer(arrowPool), veloxPool_(std::move(veloxPool)) {
// serializeColumnarBatches don't need rowType_
if (cSchema != nullptr) {
@@ -61,8 +63,7 @@ VeloxColumnarBatchSerializer::VeloxColumnarBatchSerializer(
arena_ = std::make_unique<StreamArena>(veloxPool_.get());
serde_ = std::make_unique<serializer::presto::PrestoVectorSerde>();
options_.useLosslessTimestamp = true;
- // Required by serializeSingleColumn / deserializeSingleColumn APIs
(VELOX_USER_CHECK_EQ).
- options_.compressionKind = common::CompressionKind::CompressionKind_NONE;
+ options_.compressionKind =
facebook::velox::common::stringToCompressionKind(compressionKind);
options_.nullsFirst = false;
}
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
index 33ff1301b6..4e4a3d4519 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
@@ -43,7 +43,8 @@ class VeloxColumnarBatchSerializer : public
ColumnarBatchSerializer {
VeloxColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
- struct ArrowSchema* cSchema);
+ struct ArrowSchema* cSchema,
+ const std::string& compressionKind = "none");
void append(const std::shared_ptr<ColumnarBatch>& batch) override;
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 01dfb912a6..37d8b2b3ab 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -528,7 +528,8 @@ object GlutenConfig extends ConfigRegistry {
"spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct",
"spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks",
"spark.gluten.sql.columnar.backend.velox.preferredBatchBytes",
- "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan"
+ "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan",
+
"spark.gluten.sql.columnar.backend.velox.columnarBatchSerializerCompression"
)
/** Get dynamic configs. */
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]