This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 061efb1bf [VL] Fix shuffle spill not reported to spark metric (#6740)
061efb1bf is described below

commit 061efb1bf577493e6a733353e4bbd21e5a5420a1
Author: Rong Ma <[email protected]>
AuthorDate: Fri Aug 9 10:08:51 2024 +0800

    [VL] Fix shuffle spill not reported to spark metric (#6740)
---
 cpp/core/jni/JniWrapper.cc                           |  3 ++-
 cpp/core/shuffle/LocalPartitionWriter.cc             | 18 +++++++++++++-----
 cpp/core/shuffle/LocalPartitionWriter.h              |  1 +
 cpp/core/shuffle/Options.h                           |  1 +
 cpp/core/shuffle/Payload.cc                          | 20 +++++++-------------
 cpp/core/shuffle/Payload.h                           | 18 ++++++++----------
 cpp/core/shuffle/ShuffleWriter.cc                    |  4 ++++
 cpp/core/shuffle/ShuffleWriter.h                     |  2 ++
 cpp/core/shuffle/Spill.cc                            |  2 +-
 cpp/core/shuffle/Spill.h                             |  2 +-
 cpp/core/shuffle/rss/RssPartitionWriter.cc           |  2 +-
 cpp/velox/shuffle/VeloxHashShuffleWriter.cc          |  1 +
 cpp/velox/shuffle/VeloxSortShuffleWriter.cc          |  8 ++++++++
 cpp/velox/shuffle/VeloxSortShuffleWriter.h           |  2 ++
 .../apache/gluten/vectorized/GlutenSplitResult.java  |  7 +++++++
 .../apache/spark/shuffle/ColumnarShuffleWriter.scala |  2 ++
 16 files changed, 61 insertions(+), 32 deletions(-)

diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 3e583f20b..5c2752f18 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -165,7 +165,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
   jniByteInputStreamClose = getMethodIdOrError(env, jniByteInputStreamClass, 
"close", "()V");
 
   splitResultClass = createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/GlutenSplitResult;");
-  splitResultConstructor = getMethodIdOrError(env, splitResultClass, "<init>", 
"(JJJJJJJJJ[J[J)V");
+  splitResultConstructor = getMethodIdOrError(env, splitResultClass, "<init>", 
"(JJJJJJJJJJ[J[J)V");
 
   columnarBatchSerializeResultClass =
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/ColumnarBatchSerializeResult;");
@@ -975,6 +975,7 @@ JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
       shuffleWriter->totalC2RTime(),
       shuffleWriter->totalBytesWritten(),
       shuffleWriter->totalBytesEvicted(),
+      shuffleWriter->totalBytesToEvict(),
       shuffleWriter->peakBytesAllocated(),
       partitionLengthArr,
       rawPartitionLengthArr);
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc 
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 031be791b..fe206b488 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -183,12 +183,15 @@ class LocalPartitionWriter::PayloadMerger {
     return merged;
   }
 
-  arrow::Result<std::optional<std::unique_ptr<BlockPayload>>> 
finishForSpill(uint32_t partitionId) {
+  arrow::Result<std::optional<std::unique_ptr<BlockPayload>>> finishForSpill(
+      uint32_t partitionId,
+      int64_t& totalBytesToEvict) {
     // We need to check whether the spill source is from compressing/copying 
the merged buffers.
     if ((partitionInMerge_.has_value() && *partitionInMerge_ == partitionId) 
|| !hasMerged(partitionId)) {
       return std::nullopt;
     }
     auto payload = std::move(partitionMergePayload_[partitionId]);
+    totalBytesToEvict += payload->rawSize();
     return payload->toBlockPayload(Payload::kUncompressed, pool_, codec_);
   }
 
@@ -312,7 +315,8 @@ class LocalPartitionWriter::PayloadCache {
       std::shared_ptr<arrow::io::OutputStream> os,
       const std::string& spillFile,
       arrow::MemoryPool* pool,
-      arrow::util::Codec* codec) {
+      arrow::util::Codec* codec,
+      int64_t& totalBytesToEvict) {
     std::shared_ptr<Spill> diskSpill = nullptr;
     ARROW_ASSIGN_OR_RAISE(auto start, os->Tell());
     for (uint32_t pid = 0; pid < numPartitions_; ++pid) {
@@ -321,6 +325,7 @@ class LocalPartitionWriter::PayloadCache {
         while (!payloads.empty()) {
           auto payload = std::move(payloads.front());
           payloads.pop_front();
+          totalBytesToEvict += payload->rawSize();
           // Spill the cached payload to disk.
           RETURN_NOT_OK(payload->serialize(os.get()));
           compressTime_ += payload->getCompressTime();
@@ -550,7 +555,7 @@ arrow::Status LocalPartitionWriter::evict(
     bool reuseBuffers,
     bool hasComplexType,
     bool isFinal) {
-  rawPartitionLengths_[partitionId] += inMemoryPayload->getBufferSize();
+  rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
 
   if (evictType == Evict::kSortSpill) {
     if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && 
!dataFileOs_))) {
@@ -604,6 +609,7 @@ arrow::Status LocalPartitionWriter::evict(
   return arrow::Status::OK();
 }
 
+// FIXME: Remove this code path for local partition writer.
 arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, 
std::unique_ptr<BlockPayload> blockPayload, bool stop) {
   rawPartitionLengths_[partitionId] += blockPayload->rawSize();
 
@@ -642,7 +648,8 @@ arrow::Status 
LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
     ARROW_ASSIGN_OR_RAISE(auto os, 
arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
     spills_.emplace_back();
     ARROW_ASSIGN_OR_RAISE(
-        spills_.back(), payloadCache_->spillAndClose(os, spillFile, 
payloadPool_.get(), codec_.get()));
+        spills_.back(),
+        payloadCache_->spillAndClose(os, spillFile, payloadPool_.get(), 
codec_.get(), totalBytesToEvict_));
     reclaimed += beforeSpill - payloadPool_->bytes_allocated();
     if (reclaimed >= size) {
       *actual = reclaimed;
@@ -653,7 +660,7 @@ arrow::Status 
LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
   if (merger_) {
     auto beforeSpill = payloadPool_->bytes_allocated();
     for (auto pid = 0; pid < numPartitions_; ++pid) {
-      ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finishForSpill(pid));
+      ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finishForSpill(pid, 
totalBytesToEvict_));
       if (merged.has_value()) {
         RETURN_NOT_OK(requestSpill(false));
         RETURN_NOT_OK(spiller_->spill(pid, std::move(*merged)));
@@ -678,6 +685,7 @@ arrow::Status 
LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metric
   metrics->totalCompressTime += compressTime_;
   metrics->totalEvictTime += spillTime_;
   metrics->totalWriteTime += writeTime_;
+  metrics->totalBytesToEvict += totalBytesToEvict_;
   metrics->totalBytesEvicted += totalBytesEvicted_;
   metrics->totalBytesWritten += std::filesystem::file_size(dataFile_);
   metrics->partitionLengths = std::move(partitionLengths_);
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h 
b/cpp/core/shuffle/LocalPartitionWriter.h
index efd7b4df3..555632fed 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -110,6 +110,7 @@ class LocalPartitionWriter : public PartitionWriter {
   std::vector<int32_t> subDirSelection_;
   std::shared_ptr<arrow::io::OutputStream> dataFileOs_;
 
+  int64_t totalBytesToEvict_{0};
   int64_t totalBytesEvicted_{0};
   std::vector<int64_t> partitionLengths_;
   std::vector<int64_t> rawPartitionLengths_;
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 11fa037eb..2424ec557 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -87,6 +87,7 @@ struct PartitionWriterOptions {
 struct ShuffleWriterMetrics {
   int64_t totalBytesWritten{0};
   int64_t totalBytesEvicted{0};
+  int64_t totalBytesToEvict{0};
   int64_t totalWriteTime{0};
   int64_t totalEvictTime{0};
   int64_t totalCompressTime{0};
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index b8d8274cb..d0c24e4bc 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -325,9 +325,8 @@ void BlockPayload::setCompressionTime(int64_t 
compressionTime) {
   compressTime_ = compressionTime;
 }
 
-uint64_t BlockPayload::rawSize() {
-  return std::accumulate(
-      buffers_.begin(), buffers_.end(), 0UL, [](auto sum, const auto& buffer) 
{ return sum + buffer->size(); });
+int64_t BlockPayload::rawSize() {
+  return getBufferSize(buffers_);
 }
 
 arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge(
@@ -418,10 +417,6 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> 
InMemoryPayload::readBufferAt(uint
   return std::move(buffers_[index]);
 }
 
-int64_t InMemoryPayload::getBufferSize() const {
-  return gluten::getBufferSize(buffers_);
-}
-
 arrow::Status InMemoryPayload::copyBuffers(arrow::MemoryPool* pool) {
   for (auto& buffer : buffers_) {
     if (!buffer) {
@@ -438,9 +433,8 @@ arrow::Status 
InMemoryPayload::copyBuffers(arrow::MemoryPool* pool) {
   return arrow::Status::OK();
 }
 
-uint64_t InMemoryPayload::rawSize() {
-  return std::accumulate(
-      buffers_.begin(), buffers_.end(), 0UL, [](auto sum, const auto& buffer) 
{ return sum + buffer->size(); });
+int64_t InMemoryPayload::rawSize() {
+  return getBufferSize(buffers_);
 }
 
 UncompressedDiskBlockPayload::UncompressedDiskBlockPayload(
@@ -513,7 +507,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> 
UncompressedDiskBlockPayload::read
   return buffer;
 }
 
-uint64_t UncompressedDiskBlockPayload::rawSize() {
+int64_t UncompressedDiskBlockPayload::rawSize() {
   return rawSize_;
 }
 
@@ -521,7 +515,7 @@ CompressedDiskBlockPayload::CompressedDiskBlockPayload(
     uint32_t numRows,
     const std::vector<bool>* isValidityBuffer,
     arrow::io::InputStream*& inputStream,
-    uint64_t rawSize,
+    int64_t rawSize,
     arrow::MemoryPool* /* pool */)
     : Payload(Type::kCompressed, numRows, isValidityBuffer), 
inputStream_(inputStream), rawSize_(rawSize) {}
 
@@ -536,7 +530,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> 
CompressedDiskBlockPayload::readBu
   return arrow::Status::Invalid("Cannot read buffer from 
CompressedDiskBlockPayload.");
 }
 
-uint64_t CompressedDiskBlockPayload::rawSize() {
+int64_t CompressedDiskBlockPayload::rawSize() {
   return rawSize_;
 }
 } // namespace gluten
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index 0a317d9c3..1bd8815a4 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -38,7 +38,7 @@ class Payload {
 
   virtual arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t 
index) = 0;
 
-  virtual uint64_t rawSize() = 0;
+  virtual int64_t rawSize() = 0;
 
   int64_t getCompressTime() const {
     return compressTime_;
@@ -97,7 +97,7 @@ class BlockPayload final : public Payload {
 
   arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t pos) 
override;
 
-  uint64_t rawSize() override;
+  int64_t rawSize() override;
 
  protected:
   BlockPayload(
@@ -134,11 +134,9 @@ class InMemoryPayload final : public Payload {
   arrow::Result<std::unique_ptr<BlockPayload>>
   toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, 
arrow::util::Codec* codec);
 
-  int64_t getBufferSize() const;
-
   arrow::Status copyBuffers(arrow::MemoryPool* pool);
 
-  uint64_t rawSize() override;
+  int64_t rawSize() override;
 
  private:
   std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
@@ -159,11 +157,11 @@ class UncompressedDiskBlockPayload final : public Payload 
{
 
   arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
 
-  uint64_t rawSize() override;
+  int64_t rawSize() override;
 
  private:
   arrow::io::InputStream*& inputStream_;
-  uint64_t rawSize_;
+  int64_t rawSize_;
   arrow::MemoryPool* pool_;
   arrow::util::Codec* codec_;
   uint32_t readPos_{0};
@@ -177,17 +175,17 @@ class CompressedDiskBlockPayload final : public Payload {
       uint32_t numRows,
       const std::vector<bool>* isValidityBuffer,
       arrow::io::InputStream*& inputStream,
-      uint64_t rawSize,
+      int64_t rawSize,
       arrow::MemoryPool* pool);
 
   arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
 
   arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index) 
override;
 
-  uint64_t rawSize() override;
+  int64_t rawSize() override;
 
  private:
   arrow::io::InputStream*& inputStream_;
-  uint64_t rawSize_;
+  int64_t rawSize_;
 };
 } // namespace gluten
diff --git a/cpp/core/shuffle/ShuffleWriter.cc 
b/cpp/core/shuffle/ShuffleWriter.cc
index e637d37ff..3eff9a2c8 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -55,6 +55,10 @@ int64_t ShuffleWriter::totalBytesEvicted() const {
   return metrics_.totalBytesEvicted;
 }
 
+int64_t ShuffleWriter::totalBytesToEvict() const {
+  return metrics_.totalBytesToEvict;
+}
+
 int64_t ShuffleWriter::totalWriteTime() const {
   return metrics_.totalWriteTime;
 }
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index 661112150..8c79829e0 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -52,6 +52,8 @@ class ShuffleWriter : public Reclaimable {
 
   int64_t totalBytesEvicted() const;
 
+  int64_t totalBytesToEvict() const;
+
   int64_t totalWriteTime() const;
 
   int64_t totalEvictTime() const;
diff --git a/cpp/core/shuffle/Spill.cc b/cpp/core/shuffle/Spill.cc
index 0bbe667ab..d8b9bc7eb 100644
--- a/cpp/core/shuffle/Spill.cc
+++ b/cpp/core/shuffle/Spill.cc
@@ -48,7 +48,7 @@ void Spill::insertPayload(
     Payload::Type payloadType,
     uint32_t numRows,
     const std::vector<bool>* isValidityBuffer,
-    uint64_t rawSize,
+    int64_t rawSize,
     arrow::MemoryPool* pool,
     arrow::util::Codec* codec) {
   // TODO: Add compression threshold.
diff --git a/cpp/core/shuffle/Spill.h b/cpp/core/shuffle/Spill.h
index 7ee60ef29..9d8d24087 100644
--- a/cpp/core/shuffle/Spill.h
+++ b/cpp/core/shuffle/Spill.h
@@ -46,7 +46,7 @@ class Spill final {
       Payload::Type payloadType,
       uint32_t numRows,
       const std::vector<bool>* isValidityBuffer,
-      uint64_t rawSize,
+      int64_t rawSize,
       arrow::MemoryPool* pool,
       arrow::util::Codec* codec);
 
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc 
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 19f178a2c..8f75f9993 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -56,7 +56,7 @@ arrow::Status RssPartitionWriter::evict(
     bool reuseBuffers,
     bool hasComplexType,
     bool isFinal) {
-  rawPartitionLengths_[partitionId] += inMemoryPayload->getBufferSize();
+  rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
   auto payloadType = codec_ ? Payload::Type::kCompressed : 
Payload::Type::kUncompressed;
   ARROW_ASSIGN_OR_RAISE(
       auto payload, inMemoryPayload->toBlockPayload(payloadType, 
payloadPool_.get(), codec_ ? codec_.get() : nullptr));
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index e165d4a91..00d8be166 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -1364,6 +1364,7 @@ arrow::Result<int64_t> 
VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6
       auto pid = item.first;
       ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false));
       auto payload = std::make_unique<InMemoryPayload>(item.second, 
&isValidityBuffer_, std::move(buffers));
+      metrics_.totalBytesToEvict += payload->rawSize();
       RETURN_NOT_OK(partitionWriter_->evict(pid, std::move(payload), 
Evict::kSpill, false, hasComplexType_, false));
       evicted = beforeEvict - partitionBufferPool_->bytes_allocated();
       if (evicted >= size) {
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index d15280c0a..7f3e9201f 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -283,6 +283,7 @@ arrow::Status 
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
           index - begin,
           nullptr,
           
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
 offset)});
+      updateSpillMetrics(payload);
       RETURN_NOT_OK(
           partitionWriter_->evict(partitionId, std::move(payload), 
Evict::type::kSortSpill, false, false, stopped_));
       begin = index;
@@ -296,6 +297,7 @@ arrow::Status 
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
       end - begin,
       nullptr,
       
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
 offset)});
+  updateSpillMetrics(payload);
   RETURN_NOT_OK(
       partitionWriter_->evict(partitionId, std::move(payload), 
Evict::type::kSortSpill, false, false, stopped_));
   return arrow::Status::OK();
@@ -399,4 +401,10 @@ void VeloxSortShuffleWriter::allocateMinimalArray() {
       options_.sortBufferInitialSize * sizeof(uint64_t), veloxPool_.get());
   setUpArray(std::move(array));
 }
+
+void VeloxSortShuffleWriter::updateSpillMetrics(const 
std::unique_ptr<InMemoryPayload>& payload) {
+  if (!stopped_) {
+    metrics_.totalBytesToEvict += payload->rawSize();
+  }
+}
 } // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 2925b85f8..34fbfd243 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -89,6 +89,8 @@ class VeloxSortShuffleWriter final : public 
VeloxShuffleWriter {
 
   void allocateMinimalArray();
 
+  void updateSpillMetrics(const std::unique_ptr<InMemoryPayload>& payload);
+
   // Stores compact row id -> row
   facebook::velox::BufferPtr array_;
   uint64_t* arrayPtr_;
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
index dbc0d7db5..3bed6ac79 100644
--- 
a/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
+++ 
b/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
@@ -17,6 +17,7 @@
 package org.apache.gluten.vectorized;
 
 public class GlutenSplitResult extends SplitResult {
+  private final long bytesToEvict;
   private final long peakBytes;
   private final long sortTime;
   private final long c2rTime;
@@ -30,6 +31,7 @@ public class GlutenSplitResult extends SplitResult {
       long totalC2RTime,
       long totalBytesWritten,
       long totalBytesEvicted,
+      long totalBytesToEvict, // In-memory bytes(uncompressed) before spill.
       long peakBytes,
       long[] partitionLengths,
       long[] rawPartitionLengths) {
@@ -42,11 +44,16 @@ public class GlutenSplitResult extends SplitResult {
         totalBytesEvicted,
         partitionLengths,
         rawPartitionLengths);
+    this.bytesToEvict = totalBytesToEvict;
     this.peakBytes = peakBytes;
     this.sortTime = totalSortTime;
     this.c2rTime = totalC2RTime;
   }
 
+  public long getBytesToEvict() {
+    return bytesToEvict;
+  }
+
   public long getPeakBytes() {
     return peakBytes;
   }
diff --git 
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
 
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 08535a393..251bb977f 100644
--- 
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++ 
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -210,6 +210,8 @@ class ColumnarShuffleWriter[K, V](
     dep.metrics("peakBytes").add(splitResult.getPeakBytes)
     writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten)
     writeMetrics.incWriteTime(splitResult.getTotalWriteTime + 
splitResult.getTotalSpillTime)
+    
taskContext.taskMetrics().incMemoryBytesSpilled(splitResult.getBytesToEvict)
+    
taskContext.taskMetrics().incDiskBytesSpilled(splitResult.getTotalBytesSpilled)
 
     partitionLengths = splitResult.getPartitionLengths
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to