This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 55a2332c5b [GLUTEN-10933][VL] BroadcastExchange outputs cudf::table
(#11441)
55a2332c5b is described below
commit 55a2332c5b18db3944c5cede196046819a7f9d9f
Author: Jin Chengcheng <[email protected]>
AuthorDate: Wed Jan 28 17:55:41 2026 +0800
[GLUTEN-10933][VL] BroadcastExchange outputs cudf::table (#11441)
---
cpp/core/config/GlutenConfig.h | 2 +-
cpp/velox/CMakeLists.txt | 1 +
cpp/velox/compute/VeloxRuntime.cc | 9 ++++
cpp/velox/memory/GpuBufferColumnarBatch.cc | 27 ++++++++---
cpp/velox/operators/plannodes/CudfVectorStream.h | 11 ++---
.../serializer/VeloxColumnarBatchSerializer.h | 4 +-
.../serializer/VeloxGpuColumnarBatchSerializer.cc | 54 ++++++++++++++++++++++
...ializer.h => VeloxGpuColumnarBatchSerializer.h} | 22 +++------
8 files changed, 98 insertions(+), 32 deletions(-)
diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 110c741a4b..a082a720ea 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -95,7 +95,7 @@ const std::string kSparkJsonIgnoreNullFields =
"spark.sql.jsonGenerator.ignoreNu
// cudf
const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf";
-constexpr bool kCudfEnabledDefault = true;
+constexpr bool kCudfEnabledDefault = false;
const std::string kDebugCudf = "spark.gluten.sql.debug.cudf";
const std::string kDebugCudfDefault = "false";
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 806b65f881..56fab701ee 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -207,6 +207,7 @@ if(ENABLE_GPU)
cudf/GpuLock.cc
shuffle/VeloxGpuShuffleReader.cc
shuffle/VeloxGpuShuffleWriter.cc
+ operators/serializer/VeloxGpuColumnarBatchSerializer.cc
utils/GpuBufferBatchResizer.cc
memory/GpuBufferColumnarBatch.cc)
endif()
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index 966f2af6af..5aaf83d7f4 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -52,6 +52,10 @@
DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks);
#include "operators/writer/VeloxParquetDataSourceABFS.h"
#endif
+#ifdef GLUTEN_ENABLE_GPU
+#include "operators/serializer/VeloxGpuColumnarBatchSerializer.h"
+#endif
+
using namespace facebook;
namespace gluten {
@@ -307,6 +311,11 @@ std::shared_ptr<ShuffleReader>
VeloxRuntime::createShuffleReader(
std::unique_ptr<ColumnarBatchSerializer>
VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) {
auto arrowPool = memoryManager()->defaultArrowMemoryPool();
auto veloxPool = memoryManager()->getLeafMemoryPool();
+#ifdef GLUTEN_ENABLE_GPU
+ if (veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
+ return std::make_unique<VeloxGpuColumnarBatchSerializer>(arrowPool,
veloxPool, cSchema);
+ }
+#endif
return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool,
cSchema);
}
diff --git a/cpp/velox/memory/GpuBufferColumnarBatch.cc
b/cpp/velox/memory/GpuBufferColumnarBatch.cc
index 649aadfb79..f73873bab3 100644
--- a/cpp/velox/memory/GpuBufferColumnarBatch.cc
+++ b/cpp/velox/memory/GpuBufferColumnarBatch.cc
@@ -105,10 +105,15 @@ std::shared_ptr<GpuBufferColumnarBatch>
GpuBufferColumnarBatch::compose(
std::vector<std::shared_ptr<arrow::Buffer>> returnBuffers;
returnBuffers.reserve(bufferSize);
- for (auto size : bufferSizes) {
+ for (auto i = 0; i < bufferSize; ++i) {
+ // Defer the null buffer to really contains null.
+ if (bufferTypes[i] == BufferType::kNull) {
+ returnBuffers.emplace_back(nullptr);
+ continue;
+ }
std::shared_ptr<arrow::Buffer> buffer;
// May optimize to reuse the first batch buffer.
- GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(size, pool));
+ GLUTEN_ASSIGN_OR_THROW(buffer,
arrow::AllocateResizableBuffer(bufferSizes[i], pool));
returnBuffers.emplace_back(std::move(buffer));
}
@@ -123,11 +128,21 @@ std::shared_ptr<GpuBufferColumnarBatch>
GpuBufferColumnarBatch::compose(
continue;
}
// Combine the null buffer
- // The last byte may still have space to write when nullBitsRemainder !=
0.
- auto* dst = returnBuffers[bufferIdx]->mutable_data();
- if (batch->bufferAt(bufferIdx) == nullptr) {
- arrow::bit_util::SetBitsTo(dst, rowNumber, batch->numRows(), true);
+ if (batch->bufferAt(bufferIdx) == nullptr ||
batch->bufferAt(bufferIdx)->size() == 0) {
+ if (returnBuffers[bufferIdx] != nullptr) {
+ auto* dst = returnBuffers[bufferIdx]->mutable_data();
+ arrow::bit_util::SetBitsTo(dst, rowNumber, batch->numRows(), true);
+ }
} else {
+ // Need to allocate null buffer.
+ if (returnBuffers[bufferIdx] == nullptr) {
+ std::shared_ptr<arrow::Buffer> buffer;
+ GLUTEN_ASSIGN_OR_THROW(buffer,
arrow::AllocateResizableBuffer(bufferSizes[bufferIdx], pool));
+ returnBuffers[bufferIdx] = buffer;
+ // Set all the previous rows to not null.
+ arrow::bit_util::SetBitsTo(buffer->mutable_data(), 0, rowNumber,
true);
+ }
+ auto* dst = returnBuffers[bufferIdx]->mutable_data();
arrow::internal::CopyBitmap(batch->bufferAt(bufferIdx)->data(), 0,
batch->numRows(), dst, rowNumber);
}
diff --git a/cpp/velox/operators/plannodes/CudfVectorStream.h
b/cpp/velox/operators/plannodes/CudfVectorStream.h
index 4707b8789d..9758d1c35e 100644
--- a/cpp/velox/operators/plannodes/CudfVectorStream.h
+++ b/cpp/velox/operators/plannodes/CudfVectorStream.h
@@ -18,7 +18,7 @@
#pragma once
#include "CudfVectorStream.h"
-#include "velox/experimental/cudf/exec/NvtxHelper.h"
+#include "velox/experimental/cudf/exec/CudfOperator.h"
#include "velox/experimental/cudf/exec/Utilities.h"
#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
#include "velox/experimental/cudf/vector/CudfVector.h"
@@ -93,8 +93,8 @@ class CudfValueStreamNode final : public
facebook::velox::core::PlanNode {
const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
};
-// Extends NvtxHelper to identify it as GPU node, so not add CudfFormVelox
operator.
-class CudfValueStream : public facebook::velox::exec::SourceOperator, public
facebook::velox::cudf_velox::NvtxHelper {
+// Extends CudfOperator to identify it as GPU node, so not add CudfFormVelox
operator.
+class CudfValueStream : public facebook::velox::exec::SourceOperator, public
facebook::velox::cudf_velox::CudfOperator {
public:
CudfValueStream(
int32_t operatorId,
@@ -106,10 +106,7 @@ class CudfValueStream : public
facebook::velox::exec::SourceOperator, public fac
operatorId,
valueStreamNode->id(),
valueStreamNode->name().data()),
- facebook::velox::cudf_velox::NvtxHelper(
- nvtx3::rgb{160, 82, 45}, // Sienna
- operatorId,
- fmt::format("[{}]", valueStreamNode->id())) {
+ facebook::velox::cudf_velox::CudfOperator(operatorId,
valueStreamNode->id()) {
ResultIterator* itr = valueStreamNode->iterator();
rvStream_ = std::make_unique<CudfVectorStream>(driverCtx, pool(), itr,
outputType_);
}
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
index 73fec890cf..f58da73281 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
@@ -25,7 +25,7 @@
namespace gluten {
-class VeloxColumnarBatchSerializer final : public ColumnarBatchSerializer {
+class VeloxColumnarBatchSerializer : public ColumnarBatchSerializer {
public:
VeloxColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
@@ -40,7 +40,7 @@ class VeloxColumnarBatchSerializer final : public
ColumnarBatchSerializer {
std::shared_ptr<ColumnarBatch> deserialize(uint8_t* data, int32_t size)
override;
- private:
+ protected:
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
std::unique_ptr<facebook::velox::StreamArena> arena_;
std::unique_ptr<facebook::velox::IterativeVectorSerializer> serializer_;
diff --git a/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.cc
b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.cc
new file mode 100644
index 0000000000..b9993df0e7
--- /dev/null
+++ b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.cc
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "VeloxGpuColumnarBatchSerializer.h"
+
+#include <arrow/buffer.h>
+
+#include "memory/ArrowMemory.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "velox/common/memory/Memory.h"
+#include "velox/vector/FlatVector.h"
+#include "velox/vector/arrow/Bridge.h"
+#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
+#include "velox/experimental/cudf/exec/Utilities.h"
+#include "velox/experimental/cudf/vector/CudfVector.h"
+
+#include <iostream>
+
+using namespace facebook::velox;
+
+namespace gluten {
+
+VeloxGpuColumnarBatchSerializer::VeloxGpuColumnarBatchSerializer(
+ arrow::MemoryPool* arrowPool,
+ std::shared_ptr<memory::MemoryPool> veloxPool,
+ struct ArrowSchema* cSchema)
+ : VeloxColumnarBatchSerializer(arrowPool, veloxPool, cSchema) {
+}
+
+std::shared_ptr<ColumnarBatch>
VeloxGpuColumnarBatchSerializer::deserialize(uint8_t* data, int32_t size) {
+ auto vb = VeloxColumnarBatchSerializer::deserialize(data, size);
+ auto stream = cudf_velox::cudfGlobalStreamPool().get_stream();
+ auto table =
cudf_velox::with_arrow::toCudfTable(dynamic_pointer_cast<VeloxColumnarBatch>(vb)->getRowVector(),
veloxPool_.get(), stream);
+ stream.synchronize();
+ auto vector = std::make_shared<cudf_velox::CudfVector>(
+ veloxPool_.get(), rowType_, size, std::move(table), stream);
+ return std::make_shared<VeloxColumnarBatch>(vector, vb->numColumns());
+}
+
+} // namespace gluten
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.h
similarity index 64%
copy from cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
copy to cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.h
index 73fec890cf..5830b4776f 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
+++ b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.h
@@ -19,34 +19,24 @@
#include <arrow/c/abi.h>
+#include "VeloxColumnarBatchSerializer.h"
+
#include "memory/ColumnarBatch.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
#include "velox/serializers/PrestoSerializer.h"
namespace gluten {
-class VeloxColumnarBatchSerializer final : public ColumnarBatchSerializer {
+class VeloxGpuColumnarBatchSerializer final : public
VeloxColumnarBatchSerializer {
public:
- VeloxColumnarBatchSerializer(
+ VeloxGpuColumnarBatchSerializer(
arrow::MemoryPool* arrowPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
struct ArrowSchema* cSchema);
- void append(const std::shared_ptr<ColumnarBatch>& batch) override;
-
- int64_t maxSerializedSize() override;
-
- void serializeTo(uint8_t* address, int64_t size) override;
-
+ // Deserialize to cudf table, then the Cudf pipeline accepts CudfVector, we
can remove CudfFromveloc operator from the
+ // velox pipeline input.
std::shared_ptr<ColumnarBatch> deserialize(uint8_t* data, int32_t size)
override;
-
- private:
- std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
- std::unique_ptr<facebook::velox::StreamArena> arena_;
- std::unique_ptr<facebook::velox::IterativeVectorSerializer> serializer_;
- facebook::velox::RowTypePtr rowType_;
- std::unique_ptr<facebook::velox::serializer::presto::PrestoVectorSerde>
serde_;
- facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions
options_;
};
} // namespace gluten
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]