jinchengchenghh commented on code in PR #6869:
URL: https://github.com/apache/incubator-gluten/pull/6869#discussion_r1741362173


##########
cpp/velox/shuffle/VeloxShuffleReader.cc:
##########
@@ -314,7 +314,7 @@ std::shared_ptr<ColumnarBatch> 
VeloxHashShuffleReaderDeserializer::next() {
     uint32_t numRows;
     GLUTEN_ASSIGN_OR_THROW(
         auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, 
memoryPool_, numRows, decompressTime_));
-    if (numRows == 0) {
+    if (arrowBuffers.empty()) {

Review Comment:
   Why do we have this change?



##########
cpp/core/shuffle/LocalPartitionWriter.cc:
##########
@@ -548,42 +543,14 @@ arrow::Status LocalPartitionWriter::finishSpill(bool 
close) {
   return arrow::Status::OK();
 }
 
-arrow::Status LocalPartitionWriter::evict(
+arrow::Status LocalPartitionWriter::hashEvict(
     uint32_t partitionId,
     std::unique_ptr<InMemoryPayload> inMemoryPayload,
     Evict::type evictType,

Review Comment:
   Looks like we don't need `evictType`.



##########
cpp/velox/tests/VeloxShuffleWriterTest.cc:
##########
@@ -70,24 +70,28 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
   std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
 
   for (const auto& compression : compressions) {
-    for (auto useRadixSort : {true, false}) {
-      params.push_back(ShuffleTestParams{
-          ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, 
compression, 0, 0, useRadixSort});
+    for (const auto compressionBufferSize : {4, 56, 32 * 1024}) {

Review Comment:
   Do we have the test for split large row?



##########
cpp/velox/shuffle/VeloxShuffleReader.cc:
##########
@@ -403,16 +403,43 @@ std::shared_ptr<ColumnarBatch> 
VeloxSortShuffleReaderDeserializer::next() {
     GLUTEN_ASSIGN_OR_THROW(
         auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, 
arrowPool_, numRows, decompressTime_));
 
-    if (numRows == 0) {
+    if (arrowBuffers.empty()) {
       reachEos_ = true;
       if (cachedRows_ > 0) {
         return deserializeToBatch();
       }
       return nullptr;
     }
-    auto buffer = std::move(arrowBuffers[0]);
-    cachedInputs_.emplace_back(numRows, 
wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
-    cachedRows_ += numRows;
+
+    if (numRows > 0) {
+      auto buffer = std::move(arrowBuffers[0]);
+      cachedInputs_.emplace_back(numRows, 
wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
+      cachedRows_ += numRows;
+    } else {
+      // For a large row, read all segments.

Review Comment:
   Can you explain a bit more? I don't catch the context here.



##########
cpp/core/shuffle/Payload.cc:
##########
@@ -329,6 +329,21 @@ int64_t BlockPayload::rawSize() {
   return getBufferSize(buffers_);
 }
 
+int64_t BlockPayload::maxCompressedLength(

Review Comment:
   Can we move it to anonymous namespace?



##########
cpp/velox/shuffle/VeloxSortShuffleWriter.cc:
##########
@@ -266,6 +273,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
 }
 
 arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, 
size_t begin, size_t end) {
+  VELOX_CHECK(begin < end);

Review Comment:
   VELOX_DCHECK



##########
gluten-core/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala:
##########
@@ -91,6 +91,20 @@ object GlutenShuffleUtils {
     }
   }
 
+  def getCompressionBufferSize(conf: SparkConf, codec: String): Int = {
+    if ("lz4" == codec) {
+      Math.max(
+        conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt,
+        GlutenConfig.GLUTEN_SHUFFLE_COMPRESSION_BUFFER_MIN_SIZE)

Review Comment:
   Can we support set the config? `GLUTEN_SHUFFLE_COMPRESSION_BUFFER_MIN_SIZE`



##########
cpp/core/shuffle/Payload.cc:
##########
@@ -186,45 +186,45 @@ arrow::Result<std::unique_ptr<BlockPayload>> 
BlockPayload::fromBuffers(
     std::vector<std::shared_ptr<arrow::Buffer>> buffers,
     const std::vector<bool>* isValidityBuffer,
     arrow::MemoryPool* pool,
-    arrow::util::Codec* codec) {
+    arrow::util::Codec* codec,
+    std::shared_ptr<arrow::Buffer> compressed) {
   if (payloadType == Payload::Type::kCompressed) {
     Timer compressionTime;
     compressionTime.start();
     // Compress.
-    // Compressed buffer layout: | buffer1 compressedLength | buffer1 
uncompressedLength | buffer1 | ...
-    const auto metadataLength = sizeof(int64_t) * 2 * buffers.size();
-    int64_t totalCompressedLength =
-        std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, 
const auto& buffer) {
-          if (!buffer) {
-            return sum;
-          }
-          return sum + codec->MaxCompressedLen(buffer->size(), buffer->data());
-        });
-    const auto maxCompressedLength = metadataLength + totalCompressedLength;
-    ARROW_ASSIGN_OR_RAISE(
-        std::shared_ptr<arrow::ResizableBuffer> compressed, 
arrow::AllocateResizableBuffer(maxCompressedLength, pool));
-
-    auto output = compressed->mutable_data();
+    auto maxLength = maxCompressedLength(buffers, codec);
+    std::shared_ptr<arrow::Buffer> compressedBuffer;
+    uint8_t* output;
+    if (compressed) {
+      ARROW_RETURN_IF(
+          compressed->size() < maxLength,
+          arrow::Status::Invalid(
+              "Compressed buffer length < maxCompressedLength. (", 
compressed->size(), " vs ", maxLength, ")"));
+      output = const_cast<uint8_t*>(compressed->data());
+    } else {
+      ARROW_ASSIGN_OR_RAISE(compressedBuffer, 
arrow::AllocateResizableBuffer(maxLength, pool));

Review Comment:
   Can we reuse the buffer for uncompressed payload type? 



##########
gluten-core/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala:
##########
@@ -91,6 +91,20 @@ object GlutenShuffleUtils {
     }
   }
 
+  def getCompressionBufferSize(conf: SparkConf, codec: String): Int = {
+    if ("lz4" == codec) {
+      Math.max(
+        conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt,
+        GlutenConfig.GLUTEN_SHUFFLE_COMPRESSION_BUFFER_MIN_SIZE)

Review Comment:
   Looks like the default value 64 is much less than other compression kind 
default value 32 * 1024



##########
cpp/velox/shuffle/VeloxSortShuffleWriter.cc:
##########
@@ -106,8 +104,17 @@ arrow::Status VeloxSortShuffleWriter::init() {
       options_.partitioning == Partitioning::kSingle,
       arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single 
partition."));
   allocateMinimalArray();
-  sortedBuffer_ = 
facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize, 
veloxPool_.get());
-  rawBuffer_ = sortedBuffer_->asMutable<uint8_t>();
+  // In Spark, sortedBuffer_ memory and compressionBuffer_ memory are 
pre-allocated and counted into executor
+  // memory overhead. To align with Spark, we use arrow::default_memory_pool() 
to avoid counting these memory in Gluten.

Review Comment:
   @zhztheplayer Can you help look at here? Thanks!



##########
cpp/velox/shuffle/VeloxShuffleReader.cc:
##########
@@ -403,16 +403,43 @@ std::shared_ptr<ColumnarBatch> 
VeloxSortShuffleReaderDeserializer::next() {
     GLUTEN_ASSIGN_OR_THROW(
         auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, 
arrowPool_, numRows, decompressTime_));
 
-    if (numRows == 0) {
+    if (arrowBuffers.empty()) {
       reachEos_ = true;
       if (cachedRows_ > 0) {
         return deserializeToBatch();
       }
       return nullptr;
     }
-    auto buffer = std::move(arrowBuffers[0]);
-    cachedInputs_.emplace_back(numRows, 
wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
-    cachedRows_ += numRows;
+
+    if (numRows > 0) {
+      auto buffer = std::move(arrowBuffers[0]);
+      cachedInputs_.emplace_back(numRows, 
wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
+      cachedRows_ += numRows;
+    } else {
+      // For a large row, read all segments.

Review Comment:
   Add some comments here to indicate this cases only occurs in sort buffer 
writer, and the numRows is 0. Do we have a more friendly way to specify the 
large row that is splited?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to