marin-ma commented on code in PR #5675:
URL: https://github.com/apache/incubator-gluten/pull/5675#discussion_r1602744985


##########
cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "VeloxSortBasedShuffleWriter.h"
+#include "memory/ArrowMemory.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "memory/VeloxMemoryManager.h"
+#include "shuffle/ShuffleSchema.h"
+#include "shuffle/Utils.h"
+#include "utils/Common.h"
+#include "utils/VeloxArrowUtils.h"
+#include "utils/macros.h"
+#include "velox/buffer/Buffer.h"
+#include "velox/common/base/Nulls.h"
+#include "velox/type/HugeInt.h"
+#include "velox/type/Timestamp.h"
+#include "velox/type/Type.h"
+#include "velox/vector/BaseVector.h"
+#include "velox/vector/ComplexVector.h"
+
+#if defined(__x86_64__)
+#include <immintrin.h>
+#include <x86intrin.h>
+#elif defined(__aarch64__)
+#include <arm_neon.h>
+#endif
+
+namespace gluten {
+
+#define VELOX_SHUFFLE_WRITER_LOG_FLAG 0
+
+// macro to rotate left an 8-bit value 'x' given the shift 's' is a 32-bit 
integer
+// (x is left shifted by 's' modulo 8) OR (x right shifted by (8 - 's' modulo 
8))
+#if !defined(__x86_64__)
+#define rotateLeft(x, s) (x << (s - ((s >> 3) << 3)) | x >> (8 - (s - ((s >> 
3) << 3))))
+#endif
+
+// on x86 machines, _MM_HINT_T0,T1,T2 are defined as 1, 2, 3
+// equivalent mapping to __builtin_prefetch hints is 3, 2, 1
+#if defined(__x86_64__)
+#define PREFETCHT0(ptr) _mm_prefetch(ptr, _MM_HINT_T0)
+#define PREFETCHT1(ptr) _mm_prefetch(ptr, _MM_HINT_T1)
+#define PREFETCHT2(ptr) _mm_prefetch(ptr, _MM_HINT_T2)
+#else
+#define PREFETCHT0(ptr) __builtin_prefetch(ptr, 0, 3)
+#define PREFETCHT1(ptr) __builtin_prefetch(ptr, 0, 2)
+#define PREFETCHT2(ptr) __builtin_prefetch(ptr, 0, 1)
+#endif
+
+namespace {
+
+facebook::velox::RowVectorPtr getStrippedRowVector(const 
facebook::velox::RowVector& rv) {
+  // get new row type
+  auto rowType = rv.type()->asRow();
+  auto typeChildren = rowType.children();
+  typeChildren.erase(typeChildren.begin());
+  auto newRowType = facebook::velox::ROW(std::move(typeChildren));
+
+  // get length
+  auto length = rv.size();
+
+  // get children
+  auto children = rv.children();
+  children.erase(children.begin());
+
+  return std::make_shared<facebook::velox::RowVector>(
+      rv.pool(), newRowType, facebook::velox::BufferPtr(nullptr), length, 
std::move(children));
+}
+
+const int32_t* getFirstColumn(const facebook::velox::RowVector& rv) {
+  VELOX_CHECK(rv.childrenSize() > 0, "RowVector missing partition id column.");
+
+  auto& firstChild = rv.childAt(0);
+  VELOX_CHECK(firstChild->isFlatEncoding(), "Partition id (field 0) is not 
flat encoding.");
+  VELOX_CHECK(
+      firstChild->type()->isInteger(),
+      "Partition id (field 0) should be integer, but got {}",
+      firstChild->type()->toString());
+
+  // first column is partition key hash value or pid
+  return firstChild->asFlatVector<int32_t>()->rawValues();
+}
+
+class EvictGuard {
+ public:
+  explicit EvictGuard(EvictState& evictState) : evictState_(evictState) {
+    evictState_ = EvictState::kUnevictable;
+  }
+
+  ~EvictGuard() {
+    evictState_ = EvictState::kEvictable;
+  }
+
+  // For safety and clarity.
+  EvictGuard(const EvictGuard&) = delete;
+  EvictGuard& operator=(const EvictGuard&) = delete;
+  EvictGuard(EvictGuard&&) = delete;
+  EvictGuard& operator=(EvictGuard&&) = delete;
+
+ private:
+  EvictState& evictState_;
+};
+
+} // namespace
+
+arrow::Result<std::shared_ptr<VeloxSortBasedShuffleWriter>> 
VeloxSortBasedShuffleWriter::create(
+    uint32_t numPartitions,
+    std::unique_ptr<PartitionWriter> partitionWriter,
+    ShuffleWriterOptions options,
+    std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
+    arrow::MemoryPool* arrowPool) {
+  std::shared_ptr<VeloxSortBasedShuffleWriter> res(new 
VeloxSortBasedShuffleWriter(
+      numPartitions, std::move(partitionWriter), std::move(options), 
veloxPool, arrowPool));
+  RETURN_NOT_OK(res->init());
+  return res;
+} // namespace gluten
+
+arrow::Status VeloxSortBasedShuffleWriter::init() {
+#if defined(__x86_64__)
+  supportAvx512_ = __builtin_cpu_supports("avx512bw");
+#else
+  supportAvx512_ = false;
+#endif
+
+  ARROW_ASSIGN_OR_RAISE(
+      partitioner_, Partitioner::make(options_.partitioning, numPartitions_, 
options_.startPartitionId));
+  DLOG(INFO) << "Create partitioning type: " << 
std::to_string(options_.partitioning);
+
+  partition2RowCount_.resize(numPartitions_);
+  rowVectorIndexMap_.reserve(numPartitions_);
+  for (auto pid = 0; pid < numPartitions_; ++pid) {
+    rowVectorIndexMap_[pid].reserve(options_.bufferSize);
+  }
+
+  return arrow::Status::OK();
+}
+
+arrow::Status 
VeloxSortBasedShuffleWriter::doSort(facebook::velox::RowVectorPtr rv, int64_t 
memLimit) {
+  currentInputColumnBytes_ += rv->estimateFlatSize();
+  batches_.push_back(rv);
+  if (currentInputColumnBytes_ > memLimit) {
+    for (auto pid = 0; pid < numPartitions(); ++pid) {
+      RETURN_NOT_OK(evictRowVector(pid));
+      partition2RowCount_[pid] = 0;
+    }
+    batches_.clear();
+    currentInputColumnBytes_ = 0;
+  }
+  setSplitState(SplitState::kInit);
+  return arrow::Status::OK();
+}
+
+arrow::Status 
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t 
memLimit) {
+  if (options_.partitioning == Partitioning::kSingle) {
+    auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
+    VELOX_CHECK_NOT_NULL(veloxColumnBatch);
+    auto rv = veloxColumnBatch->getFlattenedRowVector();
+    ;
+    RETURN_NOT_OK(initFromRowVector(*rv.get()));
+    RETURN_NOT_OK(doSort(rv, 
partitionWriter_.get()->options().sortBufferMaxSize));
+  } else if (options_.partitioning == Partitioning::kRange) {
+    auto compositeBatch = 
std::dynamic_pointer_cast<CompositeColumnarBatch>(cb);
+    VELOX_CHECK_NOT_NULL(compositeBatch);
+    auto batches = compositeBatch->getBatches();
+    VELOX_CHECK_EQ(batches.size(), 2);
+    auto pidBatch = VeloxColumnarBatch::from(veloxPool_.get(), batches[0]);
+    auto pidArr = getFirstColumn(*(pidBatch->getRowVector()));
+    START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]);
+    RETURN_NOT_OK(partitioner_->compute(pidArr, pidBatch->numRows(), 
batches_.size(), rowVectorIndexMap_));
+    END_TIMING();
+    auto rvBatch = VeloxColumnarBatch::from(veloxPool_.get(), batches[1]);
+    auto rv = rvBatch->getFlattenedRowVector();
+    RETURN_NOT_OK(initFromRowVector(*rv.get()));
+    RETURN_NOT_OK(doSort(rv, 
partitionWriter_.get()->options().sortBufferMaxSize));
+  } else {
+    auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
+    VELOX_CHECK_NOT_NULL(veloxColumnBatch);
+    facebook::velox::RowVectorPtr rv;
+    START_TIMING(cpuWallTimingList_[CpuWallTimingFlattenRV]);
+    rv = veloxColumnBatch->getFlattenedRowVector();
+    ;
+    END_TIMING();
+    if (partitioner_->hasPid()) {
+      auto pidArr = getFirstColumn(*rv);
+      START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]);
+      RETURN_NOT_OK(partitioner_->compute(pidArr, rv->size(), batches_.size(), 
rowVectorIndexMap_));
+      END_TIMING();
+      auto strippedRv = getStrippedRowVector(*rv);
+      RETURN_NOT_OK(initFromRowVector(*strippedRv));
+      RETURN_NOT_OK(doSort(strippedRv, 
partitionWriter_.get()->options().sortBufferMaxSize));
+    } else {
+      RETURN_NOT_OK(initFromRowVector(*rv));
+      START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]);
+      RETURN_NOT_OK(partitioner_->compute(nullptr, rv->size(), 
batches_.size(), rowVectorIndexMap_));
+      END_TIMING();
+      RETURN_NOT_OK(doSort(rv, 
partitionWriter_.get()->options().sortBufferMaxSize));
+    }
+  }
+  return arrow::Status::OK();
+}
+
+arrow::Status VeloxSortBasedShuffleWriter::evictBatch(
+    uint32_t partitionId,
+    std::ostringstream* output,
+    facebook::velox::OStreamOutputStream* out,
+    facebook::velox::RowTypePtr* rowTypePtr) {
+  int64_t rawSize = batch_->size();
+  batch_->flush(out);

Review Comment:
   @kerwin-zk Just found that the data stored in `IOBufOutputStream` might not 
be contiguous. Let's keep the current implementation as for now. I will find a 
better way to resolve it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to