This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f76034867 [VL] Update shuffle write metrics per batch (#11083)
0f76034867 is described below

commit 0f76034867aa7c422c6b336e56a2072041c4c3e3
Author: Yang Zhang <[email protected]>
AuthorDate: Wed Dec 3 12:54:34 2025 +0800

    [VL] Update shuffle write metrics per batch (#11083)
---
 .../VeloxCelebornColumnarShuffleWriter.scala       |  5 +++--
 .../spark/shuffle/ColumnarShuffleWriter.scala      |  5 +++--
 cpp/core/jni/JniWrapper.cc                         |  5 ++---
 cpp/core/shuffle/LocalPartitionWriter.cc           |  8 ++++---
 cpp/core/shuffle/LocalPartitionWriter.h            | 15 ++++++++-----
 cpp/core/shuffle/PartitionWriter.h                 | 15 ++++++++-----
 cpp/core/shuffle/ShuffleWriter.cc                  |  4 ++--
 cpp/core/shuffle/ShuffleWriter.h                   |  3 ++-
 cpp/core/shuffle/rss/RssPartitionWriter.cc         | 25 ++++++++++++++--------
 cpp/core/shuffle/rss/RssPartitionWriter.h          | 17 +++++++++------
 cpp/velox/benchmarks/GenericBenchmark.cc           |  2 +-
 cpp/velox/shuffle/VeloxHashShuffleWriter.cc        |  8 ++++---
 cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc     |  6 ++++--
 cpp/velox/shuffle/VeloxSortShuffleWriter.cc        |  6 ++++--
 .../gluten/vectorized/GlutenSplitResult.java       | 10 ++++-----
 15 files changed, 83 insertions(+), 51 deletions(-)

diff --git 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index d2c27e960c..783d68a00c 100644
--- 
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++ 
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -77,7 +77,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
         val columnarBatchHandle =
           ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, 
cb)
         val startTime = System.nanoTime()
-        shuffleWriterJniWrapper.write(
+        val bytesWritten = shuffleWriterJniWrapper.write(
           nativeShuffleWriter,
           cb.numRows,
           columnarBatchHandle,
@@ -85,6 +85,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
         dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
         dep.metrics("numInputRows").add(cb.numRows)
         dep.metrics("inputBatches").add(1)
+        writeMetrics.incBytesWritten(bytesWritten)
         // This metric is important, AQE use it to decide if EliminateLimit
         writeMetrics.incRecordsWritten(cb.numRows())
       }
@@ -120,7 +121,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
         dep.metrics("c2rTime").add(splitResult.getC2RTime)
     }
     dep.metrics("dataSize").add(splitResult.getRawPartitionLengths.sum)
-    writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten)
+    writeMetrics.incBytesWritten(splitResult.getBytesWritten)
     writeMetrics.incWriteTime(splitResult.getTotalWriteTime + 
splitResult.getTotalPushTime)
 
     partitionLengths = splitResult.getPartitionLengths
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
 
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index f78a04e69f..ef9877011b 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -214,7 +214,7 @@ class ColumnarShuffleWriter[K, V](
         val columnarBatchHandle =
           ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, 
cb)
         val startTime = System.nanoTime()
-        shuffleWriterJniWrapper.write(
+        val bytesWritten = shuffleWriterJniWrapper.write(
           nativeShuffleWriter,
           rows,
           columnarBatchHandle,
@@ -222,6 +222,7 @@ class ColumnarShuffleWriter[K, V](
         dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
         dep.metrics("numInputRows").add(rows)
         dep.metrics("inputBatches").add(1)
+        writeMetrics.incBytesWritten(bytesWritten)
         // This metric is important, AQE use it to decide if EliminateLimit
         writeMetrics.incRecordsWritten(rows)
       }
@@ -256,7 +257,7 @@ class ColumnarShuffleWriter[K, V](
     dep.metrics("dataSize").add(splitResult.getRawPartitionLengths.sum)
     dep.metrics("compressTime").add(splitResult.getTotalCompressTime)
     dep.metrics("peakBytes").add(splitResult.getPeakBytes)
-    writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten)
+    writeMetrics.incBytesWritten(splitResult.getBytesWritten)
     writeMetrics.incWriteTime(splitResult.getTotalWriteTime + 
splitResult.getTotalSpillTime)
     
taskContext.taskMetrics().incMemoryBytesSpilled(splitResult.getBytesToEvict)
     
taskContext.taskMetrics().incDiskBytesSpilled(splitResult.getTotalBytesSpilled)
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 307fa3c129..bfab41e365 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -1038,9 +1038,8 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
 
   // The column batch maybe VeloxColumnBatch or 
ArrowCStructColumnarBatch(FallbackRangeShuffleWriter)
   auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
-  auto numBytes = batch->numBytes();
   arrowAssertOkOrThrow(shuffleWriter->write(batch, memLimit), "Native write: 
shuffle writer failed");
-  return numBytes;
+  return shuffleWriter->bytesWritten();
   JNI_METHOD_END(kInvalidObjectHandle)
 }
 
@@ -1076,7 +1075,7 @@ JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
       shuffleWriter->totalCompressTime(),
       shuffleWriter->totalSortTime(),
       shuffleWriter->totalC2RTime(),
-      shuffleWriter->totalBytesWritten(),
+      shuffleWriter->bytesWritten(),
       shuffleWriter->totalBytesEvicted(),
       shuffleWriter->totalBytesToEvict(),
       shuffleWriter->peakBytesAllocated(),
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc 
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 77734e3287..948a2b0e05 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -600,7 +600,7 @@ arrow::Status 
LocalPartitionWriter::writeCachedPayloads(uint32_t partitionId, ar
   return arrow::Status::OK();
 }
 
-arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
+arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics, 
int64_t& evictBytes) {
   if (stopped_) {
     return arrow::Status::OK();
   }
@@ -645,6 +645,7 @@ arrow::Status 
LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
     }
   }
   ARROW_ASSIGN_OR_RAISE(totalBytesWritten_, dataFileOs_->Tell());
+  evictBytes += totalBytesWritten_;
 
   // Close Final file. Clear buffered resources.
   RETURN_NOT_OK(clearResource());
@@ -710,7 +711,8 @@ arrow::Status LocalPartitionWriter::hashEvict(
     uint32_t partitionId,
     std::unique_ptr<InMemoryPayload> inMemoryPayload,
     Evict::type evictType,
-    bool reuseBuffers) {
+    bool reuseBuffers,
+    int64_t& evictBytes) {
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
 
   if (evictType == Evict::kSpill) {
@@ -754,7 +756,7 @@ arrow::Status LocalPartitionWriter::hashEvict(
 }
 
 arrow::Status
-LocalPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
+LocalPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal, int64_t& 
evictBytes) {
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
 
   if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && 
!dataFileOs_))) {
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h 
b/cpp/core/shuffle/LocalPartitionWriter.h
index 3709b03c28..113e5a3cfd 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -39,13 +39,18 @@ class LocalPartitionWriter : public PartitionWriter {
       uint32_t partitionId,
       std::unique_ptr<InMemoryPayload> inMemoryPayload,
       Evict::type evictType,
-      bool reuseBuffers) override;
+      bool reuseBuffers,
+      int64_t& evictBytes) override;
 
-  arrow::Status sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal)
-      override;
+  arrow::Status sortEvict(
+      uint32_t partitionId,
+      std::unique_ptr<InMemoryPayload> inMemoryPayload,
+      bool isFinal,
+      int64_t& evictBytes) override;
 
   // This code path is not used by LocalPartitionWriter, Not implement it by 
default.
-  arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> 
blockPayload, bool stop) override {
+  arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> 
blockPayload, bool stop, int64_t& evictBytes)
+      override {
     return arrow::Status::NotImplemented("Invalid code path for local shuffle 
writer.");
   }
 
@@ -68,7 +73,7 @@ class LocalPartitionWriter : public PartitionWriter {
   /// If spill is triggered by 2.c, cached payloads of the remaining unmerged 
partitions will be spilled.
   /// In both cases, if the cached payload size doesn't free enough memory,
   /// it will shrink partition buffers to free more memory.
-  arrow::Status stop(ShuffleWriterMetrics* metrics) override;
+  arrow::Status stop(ShuffleWriterMetrics* metrics, int64_t& evictBytes) 
override;
 
   // Spill source:
   // 1. Other op.
diff --git a/cpp/core/shuffle/PartitionWriter.h 
b/cpp/core/shuffle/PartitionWriter.h
index cedae6d2fd..ebb86004cb 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -48,19 +48,24 @@ class PartitionWriter : public Reclaimable {
 
   ~PartitionWriter() override = default;
 
-  virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0;
+  virtual arrow::Status stop(ShuffleWriterMetrics* metrics, int64_t& 
evictBytes) = 0;
 
   /// Evict buffers for `partitionId` partition.
   virtual arrow::Status hashEvict(
       uint32_t partitionId,
       std::unique_ptr<InMemoryPayload> inMemoryPayload,
       Evict::type evictType,
-      bool reuseBuffers) = 0;
+      bool reuseBuffers,
+      int64_t& evictBytes) = 0;
 
-  virtual arrow::Status
-  sortEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload> 
inMemoryPayload, bool isFinal) = 0;
+  virtual arrow::Status sortEvict(
+      uint32_t partitionId,
+      std::unique_ptr<InMemoryPayload> inMemoryPayload,
+      bool isFinal,
+      int64_t& evictBytes) = 0;
 
-  virtual arrow::Status evict(uint32_t partitionId, 
std::unique_ptr<BlockPayload> blockPayload, bool stop) = 0;
+  virtual arrow::Status
+  evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool 
stop, int64_t& evictBytes) = 0;
 
   uint64_t cachedPayloadSize() {
     return payloadPool_->bytes_allocated();
diff --git a/cpp/core/shuffle/ShuffleWriter.cc 
b/cpp/core/shuffle/ShuffleWriter.cc
index 71df2b1a7b..3f0feadfb0 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -61,8 +61,8 @@ int32_t ShuffleWriter::numPartitions() const {
   return numPartitions_;
 }
 
-int64_t ShuffleWriter::totalBytesWritten() const {
-  return metrics_.totalBytesWritten;
+int64_t ShuffleWriter::bytesWritten() const {
+  return writtenBytes_;
 }
 
 int64_t ShuffleWriter::totalBytesEvicted() const {
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index 1028b0a318..934ad09076 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -41,7 +41,7 @@ class ShuffleWriter : public Reclaimable {
 
   int32_t numPartitions() const;
 
-  int64_t totalBytesWritten() const;
+  int64_t bytesWritten() const;
 
   int64_t totalBytesEvicted() const;
 
@@ -76,6 +76,7 @@ class ShuffleWriter : public Reclaimable {
   Partitioning partitioning_;
 
   ShuffleWriterMetrics metrics_{};
+  int64_t writtenBytes_{0};
 };
 
 } // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc 
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 05068b8035..a21060110f 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -29,7 +29,7 @@ void RssPartitionWriter::init() {
   rawPartitionLengths_.resize(numPartitions_, 0);
 }
 
-arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
+arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics, int64_t& 
/* evictBytes */) {
   spillTime_ -= compressTime_;
   rssClient_->stop();
 
@@ -53,12 +53,13 @@ arrow::Status RssPartitionWriter::hashEvict(
     uint32_t partitionId,
     std::unique_ptr<InMemoryPayload> inMemoryPayload,
     Evict::type evictType,
-    bool reuseBuffers) {
-  return doEvict(partitionId, std::move(inMemoryPayload));
+    bool reuseBuffers,
+    int64_t& evictBytes) {
+  return doEvict(partitionId, std::move(inMemoryPayload), evictBytes);
 }
 
 arrow::Status
-RssPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
+RssPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal, int64_t& 
evictBytes) {
   ScopedTimer timer(&spillTime_);
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
   if (shouldInitializeOs_) {
@@ -85,22 +86,26 @@ RssPartitionWriter::sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayl
       compressTime_ += compressedOs_->compressTime();
     }
     ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs_->Finish());
-    bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId, 
buffer->data_as<char>(), buffer->size());
+    auto delta = rssClient_->pushPartitionData(partitionId, 
buffer->data_as<char>(), buffer->size());
+    bytesEvicted_[partitionId] += delta;
+    evictBytes += delta;
     shouldInitializeOs_ = true;
   }
 
   return arrow::Status::OK();
 }
 
-arrow::Status RssPartitionWriter::evict(uint32_t partitionId, 
std::unique_ptr<BlockPayload> blockPayload, bool) {
+arrow::Status RssPartitionWriter::evict(uint32_t partitionId, 
std::unique_ptr<BlockPayload> blockPayload, bool, int64_t& evictBytes) {
   rawPartitionLengths_[partitionId] += blockPayload->rawSize();
   ScopedTimer timer(&spillTime_);
   ARROW_ASSIGN_OR_RAISE(auto buffer, blockPayload->readBufferAt(0));
-  bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId, 
buffer->data_as<char>(), buffer->size());
+  auto delta = rssClient_->pushPartitionData(partitionId, 
buffer->data_as<char>(), buffer->size());
+  bytesEvicted_[partitionId] += delta;
+  evictBytes += delta;
   return arrow::Status::OK();
 }
 
-arrow::Status RssPartitionWriter::doEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload) {
+arrow::Status RssPartitionWriter::doEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, int64_t& evictBytes) {
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
   auto payloadType = codec_ ? Payload::Type::kCompressed : 
Payload::Type::kUncompressed;
   ARROW_ASSIGN_OR_RAISE(
@@ -117,8 +122,10 @@ arrow::Status RssPartitionWriter::doEvict(uint32_t 
partitionId, std::unique_ptr<
   // Push.
   ScopedTimer timer(&spillTime_);
   ARROW_ASSIGN_OR_RAISE(auto buffer, rssBufferOs->Finish());
-  bytesEvicted_[partitionId] += rssClient_->pushPartitionData(
+  auto delta = rssClient_->pushPartitionData(
       partitionId, 
reinterpret_cast<char*>(const_cast<uint8_t*>(buffer->data())), buffer->size());
+  bytesEvicted_[partitionId] += delta;
+  evictBytes += delta;
   return arrow::Status::OK();
 }
 } // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h 
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index a60b22bbb0..1aff14e70f 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -42,21 +42,26 @@ class RssPartitionWriter final : public PartitionWriter {
       uint32_t partitionId,
       std::unique_ptr<InMemoryPayload> inMemoryPayload,
       Evict::type evictType,
-      bool reuseBuffers) override;
+      bool reuseBuffers,
+      int64_t& evictBytes) override;
 
-  arrow::Status sortEvict(uint32_t partitionId, 
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal)
-      override;
+  arrow::Status sortEvict(
+      uint32_t partitionId,
+      std::unique_ptr<InMemoryPayload> inMemoryPayload,
+      bool isFinal,
+      int64_t& evictBytes) override;
 
-  arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> 
blockPayload, bool stop) override;
+  arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> 
blockPayload, bool stop, int64_t& evictBytes)
+      override;
 
   arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;
 
-  arrow::Status stop(ShuffleWriterMetrics* metrics) override;
+  arrow::Status stop(ShuffleWriterMetrics* metrics, int64_t& evictBytes) 
override;
 
  private:
   void init();
 
-  arrow::Status doEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload> 
inMemoryPayload);
+  arrow::Status doEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload> 
inMemoryPayload, int64_t& evictBytes);
 
   std::shared_ptr<RssPartitionWriterOptions> options_;
   std::shared_ptr<RssClient> rssClient_;
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc 
b/cpp/velox/benchmarks/GenericBenchmark.cc
index b3f2747a52..811ca82697 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -255,7 +255,7 @@ void populateWriterMetrics(
   }
   metrics.dataSize +=
       std::accumulate(shuffleWriter->rawPartitionLengths().begin(), 
shuffleWriter->rawPartitionLengths().end(), 0LL);
-  metrics.bytesWritten += shuffleWriter->totalBytesWritten();
+  metrics.bytesWritten += shuffleWriter->bytesWritten();
   metrics.bytesSpilled += shuffleWriter->totalBytesEvicted();
 }
 
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index dadee31333..e83ce8a566 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -234,6 +234,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> 
VeloxHashShuffleWriter::generateCo
 }
 
 arrow::Status VeloxHashShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, 
int64_t memLimit) {
+  writtenBytes_ = 0;
   if (partitioning_ == Partitioning::kSingle) {
     auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
     VELOX_CHECK_NOT_NULL(veloxColumnBatch);
@@ -334,6 +335,7 @@ arrow::Status 
VeloxHashShuffleWriter::partitioningAndDoSplit(facebook::velox::Ro
 }
 
 arrow::Status VeloxHashShuffleWriter::stop() {
+  writtenBytes_ = 0;
   setSplitState(SplitState::kStopEvict);
   if (partitioning_ != Partitioning::kSingle) {
     for (auto pid = 0; pid < numPartitions_; ++pid) {
@@ -344,7 +346,7 @@ arrow::Status VeloxHashShuffleWriter::stop() {
   {
     SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]);
     setSplitState(SplitState::kStop);
-    RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
+    RETURN_NOT_OK(partitionWriter_->stop(&metrics_, writtenBytes_));
     partitionBuffers_.clear();
   }
 
@@ -966,7 +968,7 @@ arrow::Status VeloxHashShuffleWriter::evictBuffers(
   if (!buffers.empty()) {
     auto payload =
         std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_, 
schema_, std::move(buffers), hasComplexType_);
-    RETURN_NOT_OK(partitionWriter_->hashEvict(partitionId, std::move(payload), 
Evict::kCache, reuseBuffers));
+    RETURN_NOT_OK(partitionWriter_->hashEvict(partitionId, std::move(payload), 
Evict::kCache, reuseBuffers, writtenBytes_));
   }
   return arrow::Status::OK();
 }
@@ -1387,7 +1389,7 @@ arrow::Result<int64_t> 
VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6
       auto payload = std::make_unique<InMemoryPayload>(
           item.second, &isValidityBuffer_, schema_, std::move(buffers), 
hasComplexType_);
       metrics_.totalBytesToEvict += payload->rawSize();
-      RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), 
Evict::kSpill, false));
+      RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), 
Evict::kSpill, false, writtenBytes_));
       evicted = beforeEvict - partitionBufferPool_->bytes_allocated();
       if (evicted >= size) {
         break;
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index 61367d9015..c93045c1b5 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -65,6 +65,7 @@ arrow::Status 
VeloxRssSortShuffleWriter::doSort(facebook::velox::RowVectorPtr rv
 }
 
 arrow::Status VeloxRssSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> 
cb, int64_t /* memLimit */) {
+  writtenBytes_ = 0;
   if (partitioning_ == Partitioning::kSingle) {
     auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
     VELOX_CHECK_NOT_NULL(veloxColumnBatch);
@@ -125,7 +126,7 @@ arrow::Status 
VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) {
   auto arrowBuffer = std::make_shared<arrow::Buffer>(buffer->as<uint8_t>(), 
buffer->size());
   ARROW_ASSIGN_OR_RAISE(
       auto payload, BlockPayload::fromBuffers(Payload::kRaw, 0, 
{std::move(arrowBuffer)}, nullptr, nullptr, nullptr));
-  RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload), 
stopped_));
+  RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload), 
stopped_, writtenBytes_));
   batch_ = 
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), 
serde_.get());
   batch_->createStreamTree(rowType_, splitBufferSize_, &serdeOptions_);
   return arrow::Status::OK();
@@ -190,6 +191,7 @@ arrow::Status 
VeloxRssSortShuffleWriter::evictRowVector(uint32_t partitionId) {
 }
 
 arrow::Status VeloxRssSortShuffleWriter::stop() {
+  writtenBytes_ = 0;
   stopped_ = true;
   for (auto pid = 0; pid < numPartitions(); ++pid) {
     RETURN_NOT_OK(evictRowVector(pid));
@@ -199,7 +201,7 @@ arrow::Status VeloxRssSortShuffleWriter::stop() {
   {
     SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]);
     setSortState(RssSortState::kSortStop);
-    RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
+    RETURN_NOT_OK(partitionWriter_->stop(&metrics_, writtenBytes_));
   }
 
   stat();
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 30eb357912..eb5bb89caf 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -73,6 +73,7 @@ VeloxSortShuffleWriter::VeloxSortShuffleWriter(
       diskWriteBufferSize_(options->diskWriteBufferSize) {}
 
 arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, 
int64_t memLimit) {
+  writtenBytes_ = 0;
   ARROW_ASSIGN_OR_RAISE(auto rv, getPeeledRowVector(cb));
   initRowType(rv);
   RETURN_NOT_OK(insert(rv, memLimit));
@@ -81,6 +82,7 @@ arrow::Status 
VeloxSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, i
 
 arrow::Status VeloxSortShuffleWriter::stop() {
   ARROW_RETURN_IF(evictState_ == EvictState::kUnevictable, 
arrow::Status::Invalid("Unevictable state in stop."));
+  writtenBytes_ = 0;
 
   stopped_ = true;
   if (offset_ > 0) {
@@ -90,7 +92,7 @@ arrow::Status VeloxSortShuffleWriter::stop() {
   sortedBuffer_.reset();
   pages_.clear();
   pageAddresses_.clear();
-  RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
+  RETURN_NOT_OK(partitionWriter_->stop(&metrics_, writtenBytes_));
   return arrow::Status::OK();
 }
 
@@ -339,7 +341,7 @@ arrow::Status 
VeloxSortShuffleWriter::evictPartitionInternal(
       nullptr,
       
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
 rawLength)});
   updateSpillMetrics(payload);
-  RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload), 
stopped_));
+  RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload), 
stopped_, writtenBytes_));
   return arrow::Status::OK();
 }
 
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
index eabc975e02..96b2a3fc54 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
@@ -21,7 +21,7 @@ public class GlutenSplitResult {
   private final long totalWriteTime;
   private final long totalEvictTime;
   private final long totalCompressTime; // overlaps with totalEvictTime and 
totalWriteTime
-  private final long totalBytesWritten;
+  private final long bytesWritten;
   private final long totalBytesEvicted;
   private final long[] partitionLengths;
   private final long[] rawPartitionLengths;
@@ -39,7 +39,7 @@ public class GlutenSplitResult {
       long totalCompressTime,
       long totalSortTime,
       long totalC2RTime,
-      long totalBytesWritten,
+      long bytesWritten,
       long totalBytesEvicted,
       long totalBytesToEvict, // In-memory bytes(uncompressed) before spill.
       long peakBytes,
@@ -51,7 +51,7 @@ public class GlutenSplitResult {
     this.totalWriteTime = totalWriteTime;
     this.totalEvictTime = totalEvictTime;
     this.totalCompressTime = totalCompressTime;
-    this.totalBytesWritten = totalBytesWritten;
+    this.bytesWritten = bytesWritten;
     this.totalBytesEvicted = totalBytesEvicted;
     this.partitionLengths = partitionLengths;
     this.rawPartitionLengths = rawPartitionLengths;
@@ -79,8 +79,8 @@ public class GlutenSplitResult {
     return totalCompressTime;
   }
 
-  public long getTotalBytesWritten() {
-    return totalBytesWritten;
+  public long getBytesWritten() {
+    return bytesWritten;
   }
 
   public long getTotalBytesSpilled() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to