jinchengchenghh commented on code in PR #11090:
URL: 
https://github.com/apache/incubator-gluten/pull/11090#discussion_r2534153237


##########
cpp/velox/utils/GpuBufferBatchResizer.cc:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GpuBufferBatchResizer.h"
+#include "cudf/GpuLock.h"
+#include "memory/GpuBufferColumnarBatch.h"
+#include "utils/Timer.h"
+#include "velox/experimental/cudf/exec/Utilities.h"
+#include "velox/experimental/cudf/exec/VeloxCudfInterop.h"
+#include "velox/experimental/cudf/vector/CudfVector.h"
+#include "velox/vector/FlatVector.h"
+
+#include <arrow/buffer.h>
+
+#include <cuda_runtime.h>
+#include <cudf/column/column.hpp>
+#include <cudf/column/column_factories.hpp>
+#include <cudf/column/column_view.hpp>
+#include <cudf/null_mask.hpp>
+#include <cudf/types.hpp>
+#include <cudf/utilities/type_dispatcher.hpp>
+#include <rmm/cuda_stream_view.hpp>
+#include <rmm/device_buffer.hpp>
+
+using namespace facebook::velox;
+
+namespace gluten {
+namespace {
+
+struct BufferViewReleaser {
+  BufferViewReleaser() : BufferViewReleaser(nullptr) {}
+
+  BufferViewReleaser(std::shared_ptr<arrow::Buffer> arrowBuffer) : 
bufferReleaser_(std::move(arrowBuffer)) {}
+
+  void addRef() const {}
+
+  void release() const {}
+
+ private:
+  const std::shared_ptr<arrow::Buffer> bufferReleaser_;
+};
+
+BufferPtr wrapInBufferViewAsOwner(const void* buffer, size_t length, 
std::shared_ptr<arrow::Buffer> bufferReleaser) {
+  return BufferView<BufferViewReleaser>::create(
+      static_cast<const uint8_t*>(buffer), length, 
{std::move(bufferReleaser)});
+}
+
+BufferPtr convertToVeloxBuffer(std::shared_ptr<arrow::Buffer> buffer) {
+  if (buffer == nullptr) {
+    return nullptr;
+  }
+  return wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer);
+}
+
+template <TypeKind Kind, typename T = typename TypeTraits<Kind>::NativeType>
+VectorPtr readFlatVector(
+    std::vector<BufferPtr>& buffers,
+    int32_t& bufferIdx,
+    uint32_t length,
+    std::shared_ptr<const Type> type,
+    memory::MemoryPool* pool) {
+  auto nulls = buffers[bufferIdx++];
+  auto valuesOrIndices = buffers[bufferIdx++];
+
+  nulls = nulls == nullptr || nulls->size() == 0 ? BufferPtr(nullptr) : nulls;
+
+  return std::make_shared<FlatVector<T>>(
+      pool, type, nulls, length, std::move(valuesOrIndices), 
std::vector<BufferPtr>{});
+}
+
+VectorPtr readFlatVectorStringView(
+    std::vector<BufferPtr>& buffers,
+    int32_t& bufferIdx,
+    uint32_t length,
+    std::shared_ptr<const Type> type,
+    memory::MemoryPool* pool) {
+  auto nulls = buffers[bufferIdx++];
+  auto offsets = buffers[bufferIdx++];
+
+  nulls = nulls == nullptr || nulls->size() == 0 ? BufferPtr(nullptr) : nulls;
+
+  auto valueBuffer = buffers[bufferIdx++];
+
+  const auto* rawOffset = offsets->as<int32_t>();
+  const auto* valueBufferPtr = valueBuffer->as<char>();
+
+  auto values = AlignedBuffer::allocate<char>(sizeof(StringView) * length, 
pool);
+  auto* rawValues = values->asMutable<StringView>();
+  if (length > 0) {
+    rawValues[0] = StringView(valueBufferPtr, rawOffset[0]);
+  }
+  for (int32_t i = 1; i < length; ++i) {
+    rawValues[i] = StringView(valueBufferPtr + rawOffset[i - 1], rawOffset[i] 
- rawOffset[i - 1]);
+  }
+
+  std::vector<BufferPtr> stringBuffers;
+  stringBuffers.emplace_back(valueBuffer);
+
+  return std::make_shared<FlatVector<StringView>>(
+      pool, type, nulls, length, std::move(values), std::move(stringBuffers));
+}
+
+template <>
+VectorPtr readFlatVector<TypeKind::VARCHAR>(
+    std::vector<BufferPtr>& buffers,
+    int32_t& bufferIdx,
+    uint32_t length,
+    std::shared_ptr<const Type> type,
+    memory::MemoryPool* pool) {
+  return readFlatVectorStringView(buffers, bufferIdx, length, type, pool);
+}
+
+template <>
+VectorPtr readFlatVector<TypeKind::VARBINARY>(
+    std::vector<BufferPtr>& buffers,
+    int32_t& bufferIdx,
+    uint32_t length,
+    std::shared_ptr<const Type> type,
+    memory::MemoryPool* pool) {
+  return readFlatVectorStringView(buffers, bufferIdx, length, type, pool);
+}
+
+RowVectorPtr deserialize(
+    RowTypePtr type,
+    uint32_t numRows,
+    std::vector<BufferPtr>& buffers,
+    memory::MemoryPool* pool) {
+  std::vector<VectorPtr> children;
+  auto types = type->as<TypeKind::ROW>().children();
+  int32_t bufferIdx = 0;
+  for (size_t i = 0; i < types.size(); ++i) {
+    const auto kind = types[i]->kind();
+    auto res = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
+            readFlatVector, kind, buffers, bufferIdx, numRows, types[i], pool);
+    children.emplace_back(std::move(res));
+  }
+
+  return std::make_shared<RowVector>(pool, type, BufferPtr(nullptr), numRows, 
children);
+}
+
+std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
+    RowTypePtr type,
+    uint32_t numRows,
+    const std::vector<std::shared_ptr<arrow::Buffer>>& arrowBuffers,
+    memory::MemoryPool* pool) {
+  std::vector<BufferPtr> veloxBuffers;
+  veloxBuffers.reserve(arrowBuffers.size());
+  for (auto& buffer : arrowBuffers) {
+    veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer)));
+  }
+  auto rowVector = deserialize(type, numRows, veloxBuffers, pool);
+  return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
+}
+} // namespace
+
+namespace {
+
+struct DispatchColumn {
+  rmm::cuda_stream_view stream;
+  rmm::device_async_resource_ref mr;
+  std::vector<std::shared_ptr<arrow::Buffer>> buffers;
+  int32_t bufferIdx;
+  // numRows.
+  const int32_t numRows;
+
+  std::unique_ptr<rmm::device_buffer> getMaskBuffer(const 
std::shared_ptr<arrow::Buffer>& buffer) {
+    if (buffer == nullptr || buffer->size() == 0) {
+      return std::make_unique<rmm::device_buffer>(0, stream, mr);
+    }
+
+    auto const allocationSize = 
cudf::bitmask_allocation_size_bytes(static_cast<cudf::size_type>(buffer->size()));
+    auto mask = std::make_unique<rmm::device_buffer>(allocationSize, stream, 
mr);
+    CUDF_CUDA_TRY(
+        cudaMemcpyAsync(mask->data(), buffer->data(), allocationSize, 
cudaMemcpyHostToDevice, stream.value()));
+    return mask;
+  }
+
+  // For timestamp, it is cudf cudf::type_id::TIMESTAMP_NANOSECONDS, Velox 
uses int128_t while cudf uses int64_t to
+  // represent it.
+  template <TypeKind Kind, typename T = typename TypeTraits<Kind>::NativeType>
+  std::unique_ptr<cudf::column> readFlatColumn(cudf::type_id typeId) {
+    // === Step 1: get CPU buffers ===
+    auto nulls = buffers[bufferIdx++];
+    auto values = buffers[bufferIdx++];
+
+    // === Step 2: allocate GPU device buffers and copy ===
+    rmm::device_buffer dataBuf(values->size(), stream);
+    CUDF_CUDA_TRY(
+        cudaMemcpyAsync(dataBuf.data(), values->data(), values->size(), 
cudaMemcpyHostToDevice, stream.value()));
+
+    auto nullBuf = getMaskBuffer(nulls);
+
+    // === Step 3: create cudf::column ===
+    cudf::data_type cudfType{typeId};
+    size_t nullCount = nulls == nullptr || nulls->size() == 0
+        ? 0
+        : cudf::null_count(reinterpret_cast<const 
cudf::bitmask_type*>(nulls->data()), 0, numRows, stream);
+    return std::make_unique<cudf::column>(cudfType, numRows, 
std::move(dataBuf), std::move(*nullBuf), nullCount);
+  }
+
+  /// We can optimize it in shuffle writer side, returns the offset buffer 
instead of length buffer.
+  /// Then we don't need to recover the offsetBuf by rawLengths, also change 
the merge strategic, update the merge
+  /// buffer offset from last offset.
+  std::unique_ptr<cudf::column> getOffsetsColumn(const 
std::shared_ptr<arrow::Buffer>& offsets) {
+    VELOX_CHECK_GT(numRows, 0);
+    // --- 2. Copy offsets to GPU ---
+    rmm::device_buffer offsetBuf(offsets->size(), stream, mr);
+    CUDF_CUDA_TRY(
+        cudaMemcpyAsync(offsetBuf.data(), offsets->data(), offsets->size(), 
cudaMemcpyHostToDevice, stream.value()));
+
+    // --- 3. Empty null mask (no nulls in offset column) ---
+    rmm::device_buffer nullBuf(0, stream, mr);
+
+    // --- 4. Create cudf::column ---
+    return std::make_unique<cudf::column>(
+        cudf::data_type{cudf::type_id::INT32},
+        static_cast<cudf::size_type>(numRows + 1),
+        std::move(offsetBuf),
+        std::move(nullBuf),
+        0); // null_count = 0
+  }
+
+  std::unique_ptr<cudf::column> readFlatColumnStringView(cudf::type_id 
/*typeId*/) {
+    auto nulls = buffers[bufferIdx++];
+    auto offsets = buffers[bufferIdx++];
+    auto valueBuffer = buffers[bufferIdx++];
+
+    if (numRows == 0) {
+      return make_empty_column(cudf::type_id::STRING);
+    }
+
+    auto mask = getMaskBuffer(nulls);
+
+    // === Step 3: create cudf::column ===
+    size_t nullCount = nulls == nullptr || nulls->size() == 0
+        ? 0
+        : cudf::null_count(reinterpret_cast<const 
cudf::bitmask_type*>(nulls->data()), 0, numRows, stream);
+
+    auto offsetColumn = getOffsetsColumn(offsets);
+
+    rmm::device_buffer chars(valueBuffer->size(), stream, mr);
+    CUDF_CUDA_TRY(cudaMemcpyAsync(
+        chars.data(), valueBuffer->data_as<uint8_t>(), chars.size(), 
cudaMemcpyDefault, stream.value()));
+    return cudf::make_strings_column(
+        numRows, std::move(offsetColumn), std::move(chars), nullCount, 
std::move(*mask.release()));
+  }
+};
+
+template <>
+std::unique_ptr<cudf::column> 
DispatchColumn::readFlatColumn<TypeKind::VARCHAR>(cudf::type_id typeId) {
+  return readFlatColumnStringView(typeId);
+}
+
+template <>
+std::unique_ptr<cudf::column> 
DispatchColumn::readFlatColumn<TypeKind::VARBINARY>(cudf::type_id typeId) {
+  return readFlatColumnStringView(typeId);
+}
+
+std::shared_ptr<VeloxColumnarBatch> makeCudfTable(
+    RowTypePtr type,
+    int32_t numRows,
+    const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
+    memory::MemoryPool* pool) {
+  std::vector<std::unique_ptr<cudf::column>> cudfColumns;
+  cudfColumns.reserve(type->size());
+
+  int32_t bufferIdx = 0;
+  auto stream = cudf_velox::cudfGlobalStreamPool().get_stream();
+  DispatchColumn dispatch{stream, cudf::get_current_device_resource_ref(), 
std::move(buffers), bufferIdx, numRows};
+  for (const auto& colType : type->children()) {
+    auto res = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
+        dispatch.readFlatColumn, colType->kind(), 
cudf_velox::veloxToCudfTypeId(colType));
+    cudfColumns.emplace_back(std::move(res));
+  }
+  auto cudfTable = std::make_unique<cudf::table>(std::move(cudfColumns));
+  stream.synchronize();
+  return std::make_shared<VeloxColumnarBatch>(
+      std::make_shared<cudf_velox::CudfVector>(pool, type, numRows, 
std::move(cudfTable), stream));
+}
+
+} // namespace
+
+GpuBufferBatchResizer::GpuBufferBatchResizer(
+    arrow::MemoryPool* arrowPool,
+    facebook::velox::memory::MemoryPool* pool,
+    int32_t minOutputBatchSize,
+    std::unique_ptr<ColumnarBatchIterator> in)
+    : arrowPool_(arrowPool),
+      pool_(pool),
+      minOutputBatchSize_(minOutputBatchSize),
+      in_(std::move(in)) {
+  VELOX_CHECK_GT(minOutputBatchSize_, 0, "minOutputBatchSize should be larger 
than 0");
+}
+
+std::shared_ptr<ColumnarBatch> GpuBufferBatchResizer::next() {
+  std::vector<std::shared_ptr<GpuBufferColumnarBatch>> cachedBatches;
+  int32_t cachedRows = 0;
+  while (cachedRows < minOutputBatchSize_) {
+    auto nextCb = in_->next();
+    if (!nextCb) {
+      // No more input.
+      break;
+    }
+
+    auto nextBatch = std::dynamic_pointer_cast<GpuBufferColumnarBatch>(nextCb);
+    VELOX_CHECK_NOT_NULL(nextBatch);
+    if (nextBatch->numRows() == 0) {
+        continue;
+    }
+
+    cachedRows += nextBatch->numRows();
+    cachedBatches.push_back(std::move(nextBatch));
+  }
+  if (cachedRows == 0) {
+    return nullptr;
+  }
+
+  // Compose all cached batches into one
+  auto batch = GpuBufferColumnarBatch::compose(arrowPool_, cachedBatches, 
cachedRows);
+
+  auto finalBatch = makeColumnarBatch(batch->getRowType(), cachedRows, 
batch->buffers(), pool_);

Review Comment:
   Hi, please ignore this comment, I have found the error and fix it. Thank you 
at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to