This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 55a2332c5b [GLUTEN-10933][VL] BroadcastExchange outputs cudf::table 
(#11441)
55a2332c5b is described below

commit 55a2332c5b18db3944c5cede196046819a7f9d9f
Author: Jin Chengcheng <[email protected]>
AuthorDate: Wed Jan 28 17:55:41 2026 +0800

    [GLUTEN-10933][VL] BroadcastExchange outputs cudf::table (#11441)
---
 cpp/core/config/GlutenConfig.h                     |  2 +-
 cpp/velox/CMakeLists.txt                           |  1 +
 cpp/velox/compute/VeloxRuntime.cc                  |  9 ++++
 cpp/velox/memory/GpuBufferColumnarBatch.cc         | 27 ++++++++---
 cpp/velox/operators/plannodes/CudfVectorStream.h   | 11 ++---
 .../serializer/VeloxColumnarBatchSerializer.h      |  4 +-
 .../serializer/VeloxGpuColumnarBatchSerializer.cc  | 54 ++++++++++++++++++++++
 ...ializer.h => VeloxGpuColumnarBatchSerializer.h} | 22 +++------
 8 files changed, 98 insertions(+), 32 deletions(-)

diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h
index 110c741a4b..a082a720ea 100644
--- a/cpp/core/config/GlutenConfig.h
+++ b/cpp/core/config/GlutenConfig.h
@@ -95,7 +95,7 @@ const std::string kSparkJsonIgnoreNullFields = 
"spark.sql.jsonGenerator.ignoreNu
 
 // cudf
 const std::string kCudfEnabled = "spark.gluten.sql.columnar.cudf";
-constexpr bool kCudfEnabledDefault = true;
+constexpr bool kCudfEnabledDefault = false;
 const std::string kDebugCudf = "spark.gluten.sql.debug.cudf";
 const std::string kDebugCudfDefault = "false";
 
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 806b65f881..56fab701ee 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -207,6 +207,7 @@ if(ENABLE_GPU)
     cudf/GpuLock.cc
     shuffle/VeloxGpuShuffleReader.cc
     shuffle/VeloxGpuShuffleWriter.cc
+    operators/serializer/VeloxGpuColumnarBatchSerializer.cc
     utils/GpuBufferBatchResizer.cc
     memory/GpuBufferColumnarBatch.cc)
 endif()
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index 966f2af6af..5aaf83d7f4 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -52,6 +52,10 @@ 
DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks);
 #include "operators/writer/VeloxParquetDataSourceABFS.h"
 #endif
 
+#ifdef GLUTEN_ENABLE_GPU
+#include "operators/serializer/VeloxGpuColumnarBatchSerializer.h"
+#endif
+
 using namespace facebook;
 
 namespace gluten {
@@ -307,6 +311,11 @@ std::shared_ptr<ShuffleReader> 
VeloxRuntime::createShuffleReader(
 std::unique_ptr<ColumnarBatchSerializer> 
VeloxRuntime::createColumnarBatchSerializer(struct ArrowSchema* cSchema) {
   auto arrowPool = memoryManager()->defaultArrowMemoryPool();
   auto veloxPool = memoryManager()->getLeafMemoryPool();
+#ifdef GLUTEN_ENABLE_GPU
+  if (veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
+    return std::make_unique<VeloxGpuColumnarBatchSerializer>(arrowPool, 
veloxPool, cSchema);
+  }
+#endif
   return std::make_unique<VeloxColumnarBatchSerializer>(arrowPool, veloxPool, 
cSchema);
 }
 
diff --git a/cpp/velox/memory/GpuBufferColumnarBatch.cc 
b/cpp/velox/memory/GpuBufferColumnarBatch.cc
index 649aadfb79..f73873bab3 100644
--- a/cpp/velox/memory/GpuBufferColumnarBatch.cc
+++ b/cpp/velox/memory/GpuBufferColumnarBatch.cc
@@ -105,10 +105,15 @@ std::shared_ptr<GpuBufferColumnarBatch> 
GpuBufferColumnarBatch::compose(
 
   std::vector<std::shared_ptr<arrow::Buffer>> returnBuffers;
   returnBuffers.reserve(bufferSize);
-  for (auto size : bufferSizes) {
+  for (auto i = 0; i < bufferSize; ++i) {
+    // Defer the null buffer to really contains null.
+    if (bufferTypes[i] == BufferType::kNull) {
+      returnBuffers.emplace_back(nullptr);
+      continue;
+    }
     std::shared_ptr<arrow::Buffer> buffer;
     // May optimize to reuse the first batch buffer.
-    GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(size, pool));
+    GLUTEN_ASSIGN_OR_THROW(buffer, 
arrow::AllocateResizableBuffer(bufferSizes[i], pool));
     returnBuffers.emplace_back(std::move(buffer));
   }
 
@@ -123,11 +128,21 @@ std::shared_ptr<GpuBufferColumnarBatch> 
GpuBufferColumnarBatch::compose(
         continue;
       }
       // Combine the null buffer
-      // The last byte may still have space to write when nullBitsRemainder != 
0.
-      auto* dst = returnBuffers[bufferIdx]->mutable_data();
-      if (batch->bufferAt(bufferIdx) == nullptr) {
-        arrow::bit_util::SetBitsTo(dst, rowNumber, batch->numRows(), true);
+      if (batch->bufferAt(bufferIdx) == nullptr || 
batch->bufferAt(bufferIdx)->size() == 0) {
+        if (returnBuffers[bufferIdx] != nullptr) {
+          auto* dst = returnBuffers[bufferIdx]->mutable_data();
+          arrow::bit_util::SetBitsTo(dst, rowNumber, batch->numRows(), true);
+        }
       } else {
+        // Need to allocate null buffer.
+        if (returnBuffers[bufferIdx] == nullptr) {
+          std::shared_ptr<arrow::Buffer> buffer;
+          GLUTEN_ASSIGN_OR_THROW(buffer, 
arrow::AllocateResizableBuffer(bufferSizes[bufferIdx], pool));
+          returnBuffers[bufferIdx] = buffer;
+          // Set all the previous rows to not null.
+          arrow::bit_util::SetBitsTo(buffer->mutable_data(), 0, rowNumber, 
true);
+        }
+        auto* dst = returnBuffers[bufferIdx]->mutable_data();
         arrow::internal::CopyBitmap(batch->bufferAt(bufferIdx)->data(), 0, 
batch->numRows(), dst, rowNumber);
       }
 
diff --git a/cpp/velox/operators/plannodes/CudfVectorStream.h 
b/cpp/velox/operators/plannodes/CudfVectorStream.h
index 4707b8789d..9758d1c35e 100644
--- a/cpp/velox/operators/plannodes/CudfVectorStream.h
+++ b/cpp/velox/operators/plannodes/CudfVectorStream.h
@@ -18,7 +18,7 @@
 #pragma once
 
 #include "CudfVectorStream.h"
-#include "velox/experimental/cudf/exec/NvtxHelper.h"
+#include "velox/experimental/cudf/exec/CudfOperator.h"
 #include "velox/experimental/cudf/exec/Utilities.h"
 #include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
 #include "velox/experimental/cudf/vector/CudfVector.h"
@@ -93,8 +93,8 @@ class CudfValueStreamNode final : public 
facebook::velox::core::PlanNode {
   const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources_;
 };
 
-// Extends NvtxHelper to identify it as GPU node, so not add CudfFormVelox 
operator.
-class CudfValueStream : public facebook::velox::exec::SourceOperator, public 
facebook::velox::cudf_velox::NvtxHelper {
+// Extends CudfOperator to identify it as GPU node, so not add CudfFormVelox 
operator.
+class CudfValueStream : public facebook::velox::exec::SourceOperator, public 
facebook::velox::cudf_velox::CudfOperator {
  public:
   CudfValueStream(
       int32_t operatorId,
@@ -106,10 +106,7 @@ class CudfValueStream : public 
facebook::velox::exec::SourceOperator, public fac
             operatorId,
             valueStreamNode->id(),
             valueStreamNode->name().data()),
-        facebook::velox::cudf_velox::NvtxHelper(
-            nvtx3::rgb{160, 82, 45}, // Sienna
-            operatorId,
-            fmt::format("[{}]", valueStreamNode->id())) {
+        facebook::velox::cudf_velox::CudfOperator(operatorId, 
valueStreamNode->id()) {
     ResultIterator* itr = valueStreamNode->iterator();
     rvStream_ = std::make_unique<CudfVectorStream>(driverCtx, pool(), itr, 
outputType_);
   }
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h 
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
index 73fec890cf..f58da73281 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
@@ -25,7 +25,7 @@
 
 namespace gluten {
 
-class VeloxColumnarBatchSerializer final : public ColumnarBatchSerializer {
+class VeloxColumnarBatchSerializer : public ColumnarBatchSerializer {
  public:
   VeloxColumnarBatchSerializer(
       arrow::MemoryPool* arrowPool,
@@ -40,7 +40,7 @@ class VeloxColumnarBatchSerializer final : public 
ColumnarBatchSerializer {
 
   std::shared_ptr<ColumnarBatch> deserialize(uint8_t* data, int32_t size) 
override;
 
- private:
+ protected:
   std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
   std::unique_ptr<facebook::velox::StreamArena> arena_;
   std::unique_ptr<facebook::velox::IterativeVectorSerializer> serializer_;
diff --git a/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.cc 
b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.cc
new file mode 100644
index 0000000000..b9993df0e7
--- /dev/null
+++ b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.cc
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "VeloxGpuColumnarBatchSerializer.h"
+
+#include <arrow/buffer.h>
+
+#include "memory/ArrowMemory.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "velox/common/memory/Memory.h"
+#include "velox/vector/FlatVector.h"
+#include "velox/vector/arrow/Bridge.h"
+#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
+#include "velox/experimental/cudf/exec/Utilities.h"
+#include "velox/experimental/cudf/vector/CudfVector.h"
+
+#include <iostream>
+
+using namespace facebook::velox;
+
+namespace gluten {
+
+VeloxGpuColumnarBatchSerializer::VeloxGpuColumnarBatchSerializer(
+    arrow::MemoryPool* arrowPool,
+    std::shared_ptr<memory::MemoryPool> veloxPool,
+    struct ArrowSchema* cSchema)
+    : VeloxColumnarBatchSerializer(arrowPool, veloxPool, cSchema) {
+}
+
+std::shared_ptr<ColumnarBatch> 
VeloxGpuColumnarBatchSerializer::deserialize(uint8_t* data, int32_t size) {
+  auto vb = VeloxColumnarBatchSerializer::deserialize(data, size);
+  auto stream = cudf_velox::cudfGlobalStreamPool().get_stream();
+  auto table = 
cudf_velox::with_arrow::toCudfTable(dynamic_pointer_cast<VeloxColumnarBatch>(vb)->getRowVector(),
 veloxPool_.get(), stream);
+  stream.synchronize();
+  auto vector = std::make_shared<cudf_velox::CudfVector>(
+      veloxPool_.get(), rowType_, size, std::move(table), stream);
+  return std::make_shared<VeloxColumnarBatch>(vector, vb->numColumns());
+}
+
+} // namespace gluten
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h 
b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.h
similarity index 64%
copy from cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
copy to cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.h
index 73fec890cf..5830b4776f 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.h
+++ b/cpp/velox/operators/serializer/VeloxGpuColumnarBatchSerializer.h
@@ -19,34 +19,24 @@
 
 #include <arrow/c/abi.h>
 
+#include "VeloxColumnarBatchSerializer.h"
+
 #include "memory/ColumnarBatch.h"
 #include "operators/serializer/ColumnarBatchSerializer.h"
 #include "velox/serializers/PrestoSerializer.h"
 
 namespace gluten {
 
-class VeloxColumnarBatchSerializer final : public ColumnarBatchSerializer {
+class VeloxGpuColumnarBatchSerializer final : public 
VeloxColumnarBatchSerializer {
  public:
-  VeloxColumnarBatchSerializer(
+  VeloxGpuColumnarBatchSerializer(
       arrow::MemoryPool* arrowPool,
       std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
       struct ArrowSchema* cSchema);
 
-  void append(const std::shared_ptr<ColumnarBatch>& batch) override;
-
-  int64_t maxSerializedSize() override;
-
-  void serializeTo(uint8_t* address, int64_t size) override;
-
+  // Deserialize to cudf table, then the Cudf pipeline accepts CudfVector, we 
can remove CudfFromveloc operator from the
+  // velox pipeline input.
   std::shared_ptr<ColumnarBatch> deserialize(uint8_t* data, int32_t size) 
override;
-
- private:
-  std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
-  std::unique_ptr<facebook::velox::StreamArena> arena_;
-  std::unique_ptr<facebook::velox::IterativeVectorSerializer> serializer_;
-  facebook::velox::RowTypePtr rowType_;
-  std::unique_ptr<facebook::velox::serializer::presto::PrestoVectorSerde> 
serde_;
-  facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions 
options_;
 };
 
 } // namespace gluten


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to