This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0f76034867 [VL] Update shuffle write metrics per batch (#11083)
0f76034867 is described below
commit 0f76034867aa7c422c6b336e56a2072041c4c3e3
Author: Yang Zhang <[email protected]>
AuthorDate: Wed Dec 3 12:54:34 2025 +0800
[VL] Update shuffle write metrics per batch (#11083)
---
.../VeloxCelebornColumnarShuffleWriter.scala | 5 +++--
.../spark/shuffle/ColumnarShuffleWriter.scala | 5 +++--
cpp/core/jni/JniWrapper.cc | 5 ++---
cpp/core/shuffle/LocalPartitionWriter.cc | 8 ++++---
cpp/core/shuffle/LocalPartitionWriter.h | 15 ++++++++-----
cpp/core/shuffle/PartitionWriter.h | 15 ++++++++-----
cpp/core/shuffle/ShuffleWriter.cc | 4 ++--
cpp/core/shuffle/ShuffleWriter.h | 3 ++-
cpp/core/shuffle/rss/RssPartitionWriter.cc | 25 ++++++++++++++--------
cpp/core/shuffle/rss/RssPartitionWriter.h | 17 +++++++++------
cpp/velox/benchmarks/GenericBenchmark.cc | 2 +-
cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 8 ++++---
cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc | 6 ++++--
cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 6 ++++--
.../gluten/vectorized/GlutenSplitResult.java | 10 ++++-----
15 files changed, 83 insertions(+), 51 deletions(-)
diff --git
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index d2c27e960c..783d68a00c 100644
---
a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++
b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -77,7 +77,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
val columnarBatchHandle =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
cb)
val startTime = System.nanoTime()
- shuffleWriterJniWrapper.write(
+ val bytesWritten = shuffleWriterJniWrapper.write(
nativeShuffleWriter,
cb.numRows,
columnarBatchHandle,
@@ -85,6 +85,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
dep.metrics("numInputRows").add(cb.numRows)
dep.metrics("inputBatches").add(1)
+ writeMetrics.incBytesWritten(bytesWritten)
// This metric is important, AQE use it to decide if EliminateLimit
writeMetrics.incRecordsWritten(cb.numRows())
}
@@ -120,7 +121,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
dep.metrics("c2rTime").add(splitResult.getC2RTime)
}
dep.metrics("dataSize").add(splitResult.getRawPartitionLengths.sum)
- writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten)
+ writeMetrics.incBytesWritten(splitResult.getBytesWritten)
writeMetrics.incWriteTime(splitResult.getTotalWriteTime +
splitResult.getTotalPushTime)
partitionLengths = splitResult.getPartitionLengths
diff --git
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index f78a04e69f..ef9877011b 100644
---
a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -214,7 +214,7 @@ class ColumnarShuffleWriter[K, V](
val columnarBatchHandle =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
cb)
val startTime = System.nanoTime()
- shuffleWriterJniWrapper.write(
+ val bytesWritten = shuffleWriterJniWrapper.write(
nativeShuffleWriter,
rows,
columnarBatchHandle,
@@ -222,6 +222,7 @@ class ColumnarShuffleWriter[K, V](
dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime)
dep.metrics("numInputRows").add(rows)
dep.metrics("inputBatches").add(1)
+ writeMetrics.incBytesWritten(bytesWritten)
// This metric is important, AQE use it to decide if EliminateLimit
writeMetrics.incRecordsWritten(rows)
}
@@ -256,7 +257,7 @@ class ColumnarShuffleWriter[K, V](
dep.metrics("dataSize").add(splitResult.getRawPartitionLengths.sum)
dep.metrics("compressTime").add(splitResult.getTotalCompressTime)
dep.metrics("peakBytes").add(splitResult.getPeakBytes)
- writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten)
+ writeMetrics.incBytesWritten(splitResult.getBytesWritten)
writeMetrics.incWriteTime(splitResult.getTotalWriteTime +
splitResult.getTotalSpillTime)
taskContext.taskMetrics().incMemoryBytesSpilled(splitResult.getBytesToEvict)
taskContext.taskMetrics().incDiskBytesSpilled(splitResult.getTotalBytesSpilled)
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 307fa3c129..bfab41e365 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -1038,9 +1038,8 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
// The column batch maybe VeloxColumnBatch or
ArrowCStructColumnarBatch(FallbackRangeShuffleWriter)
auto batch = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
- auto numBytes = batch->numBytes();
arrowAssertOkOrThrow(shuffleWriter->write(batch, memLimit), "Native write:
shuffle writer failed");
- return numBytes;
+ return shuffleWriter->bytesWritten();
JNI_METHOD_END(kInvalidObjectHandle)
}
@@ -1076,7 +1075,7 @@ JNIEXPORT jobject JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
shuffleWriter->totalCompressTime(),
shuffleWriter->totalSortTime(),
shuffleWriter->totalC2RTime(),
- shuffleWriter->totalBytesWritten(),
+ shuffleWriter->bytesWritten(),
shuffleWriter->totalBytesEvicted(),
shuffleWriter->totalBytesToEvict(),
shuffleWriter->peakBytesAllocated(),
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 77734e3287..948a2b0e05 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -600,7 +600,7 @@ arrow::Status
LocalPartitionWriter::writeCachedPayloads(uint32_t partitionId, ar
return arrow::Status::OK();
}
-arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
+arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics,
int64_t& evictBytes) {
if (stopped_) {
return arrow::Status::OK();
}
@@ -645,6 +645,7 @@ arrow::Status
LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
}
}
ARROW_ASSIGN_OR_RAISE(totalBytesWritten_, dataFileOs_->Tell());
+ evictBytes += totalBytesWritten_;
// Close Final file. Clear buffered resources.
RETURN_NOT_OK(clearResource());
@@ -710,7 +711,8 @@ arrow::Status LocalPartitionWriter::hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
- bool reuseBuffers) {
+ bool reuseBuffers,
+ int64_t& evictBytes) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
if (evictType == Evict::kSpill) {
@@ -754,7 +756,7 @@ arrow::Status LocalPartitionWriter::hashEvict(
}
arrow::Status
-LocalPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
+LocalPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal, int64_t&
evictBytes) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal &&
!dataFileOs_))) {
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h
b/cpp/core/shuffle/LocalPartitionWriter.h
index 3709b03c28..113e5a3cfd 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -39,13 +39,18 @@ class LocalPartitionWriter : public PartitionWriter {
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
- bool reuseBuffers) override;
+ bool reuseBuffers,
+ int64_t& evictBytes) override;
- arrow::Status sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal)
- override;
+ arrow::Status sortEvict(
+ uint32_t partitionId,
+ std::unique_ptr<InMemoryPayload> inMemoryPayload,
+ bool isFinal,
+ int64_t& evictBytes) override;
// This code path is not used by LocalPartitionWriter, Not implement it by
default.
- arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload>
blockPayload, bool stop) override {
+ arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload>
blockPayload, bool stop, int64_t& evictBytes)
+ override {
return arrow::Status::NotImplemented("Invalid code path for local shuffle
writer.");
}
@@ -68,7 +73,7 @@ class LocalPartitionWriter : public PartitionWriter {
/// If spill is triggered by 2.c, cached payloads of the remaining unmerged
partitions will be spilled.
/// In both cases, if the cached payload size doesn't free enough memory,
/// it will shrink partition buffers to free more memory.
- arrow::Status stop(ShuffleWriterMetrics* metrics) override;
+ arrow::Status stop(ShuffleWriterMetrics* metrics, int64_t& evictBytes)
override;
// Spill source:
// 1. Other op.
diff --git a/cpp/core/shuffle/PartitionWriter.h
b/cpp/core/shuffle/PartitionWriter.h
index cedae6d2fd..ebb86004cb 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -48,19 +48,24 @@ class PartitionWriter : public Reclaimable {
~PartitionWriter() override = default;
- virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0;
+ virtual arrow::Status stop(ShuffleWriterMetrics* metrics, int64_t&
evictBytes) = 0;
/// Evict buffers for `partitionId` partition.
virtual arrow::Status hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
- bool reuseBuffers) = 0;
+ bool reuseBuffers,
+ int64_t& evictBytes) = 0;
- virtual arrow::Status
- sortEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload>
inMemoryPayload, bool isFinal) = 0;
+ virtual arrow::Status sortEvict(
+ uint32_t partitionId,
+ std::unique_ptr<InMemoryPayload> inMemoryPayload,
+ bool isFinal,
+ int64_t& evictBytes) = 0;
- virtual arrow::Status evict(uint32_t partitionId,
std::unique_ptr<BlockPayload> blockPayload, bool stop) = 0;
+ virtual arrow::Status
+ evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool
stop, int64_t& evictBytes) = 0;
uint64_t cachedPayloadSize() {
return payloadPool_->bytes_allocated();
diff --git a/cpp/core/shuffle/ShuffleWriter.cc
b/cpp/core/shuffle/ShuffleWriter.cc
index 71df2b1a7b..3f0feadfb0 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -61,8 +61,8 @@ int32_t ShuffleWriter::numPartitions() const {
return numPartitions_;
}
-int64_t ShuffleWriter::totalBytesWritten() const {
- return metrics_.totalBytesWritten;
+int64_t ShuffleWriter::bytesWritten() const {
+ return writtenBytes_;
}
int64_t ShuffleWriter::totalBytesEvicted() const {
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index 1028b0a318..934ad09076 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -41,7 +41,7 @@ class ShuffleWriter : public Reclaimable {
int32_t numPartitions() const;
- int64_t totalBytesWritten() const;
+ int64_t bytesWritten() const;
int64_t totalBytesEvicted() const;
@@ -76,6 +76,7 @@ class ShuffleWriter : public Reclaimable {
Partitioning partitioning_;
ShuffleWriterMetrics metrics_{};
+ int64_t writtenBytes_{0};
};
} // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 05068b8035..a21060110f 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -29,7 +29,7 @@ void RssPartitionWriter::init() {
rawPartitionLengths_.resize(numPartitions_, 0);
}
-arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
+arrow::Status RssPartitionWriter::stop(ShuffleWriterMetrics* metrics, int64_t&
/* evictBytes */) {
spillTime_ -= compressTime_;
rssClient_->stop();
@@ -53,12 +53,13 @@ arrow::Status RssPartitionWriter::hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
- bool reuseBuffers) {
- return doEvict(partitionId, std::move(inMemoryPayload));
+ bool reuseBuffers,
+ int64_t& evictBytes) {
+ return doEvict(partitionId, std::move(inMemoryPayload), evictBytes);
}
arrow::Status
-RssPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
+RssPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal, int64_t&
evictBytes) {
ScopedTimer timer(&spillTime_);
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
if (shouldInitializeOs_) {
@@ -85,22 +86,26 @@ RssPartitionWriter::sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayl
compressTime_ += compressedOs_->compressTime();
}
ARROW_ASSIGN_OR_RAISE(const auto buffer, rssOs_->Finish());
- bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId,
buffer->data_as<char>(), buffer->size());
+ auto delta = rssClient_->pushPartitionData(partitionId,
buffer->data_as<char>(), buffer->size());
+ bytesEvicted_[partitionId] += delta;
+ evictBytes += delta;
shouldInitializeOs_ = true;
}
return arrow::Status::OK();
}
-arrow::Status RssPartitionWriter::evict(uint32_t partitionId,
std::unique_ptr<BlockPayload> blockPayload, bool) {
+arrow::Status RssPartitionWriter::evict(uint32_t partitionId,
std::unique_ptr<BlockPayload> blockPayload, bool, int64_t& evictBytes) {
rawPartitionLengths_[partitionId] += blockPayload->rawSize();
ScopedTimer timer(&spillTime_);
ARROW_ASSIGN_OR_RAISE(auto buffer, blockPayload->readBufferAt(0));
- bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId,
buffer->data_as<char>(), buffer->size());
+ auto delta = rssClient_->pushPartitionData(partitionId,
buffer->data_as<char>(), buffer->size());
+ bytesEvicted_[partitionId] += delta;
+ evictBytes += delta;
return arrow::Status::OK();
}
-arrow::Status RssPartitionWriter::doEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload) {
+arrow::Status RssPartitionWriter::doEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, int64_t& evictBytes) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
auto payloadType = codec_ ? Payload::Type::kCompressed :
Payload::Type::kUncompressed;
ARROW_ASSIGN_OR_RAISE(
@@ -117,8 +122,10 @@ arrow::Status RssPartitionWriter::doEvict(uint32_t
partitionId, std::unique_ptr<
// Push.
ScopedTimer timer(&spillTime_);
ARROW_ASSIGN_OR_RAISE(auto buffer, rssBufferOs->Finish());
- bytesEvicted_[partitionId] += rssClient_->pushPartitionData(
+ auto delta = rssClient_->pushPartitionData(
partitionId,
reinterpret_cast<char*>(const_cast<uint8_t*>(buffer->data())), buffer->size());
+ bytesEvicted_[partitionId] += delta;
+ evictBytes += delta;
return arrow::Status::OK();
}
} // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index a60b22bbb0..1aff14e70f 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -42,21 +42,26 @@ class RssPartitionWriter final : public PartitionWriter {
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
- bool reuseBuffers) override;
+ bool reuseBuffers,
+ int64_t& evictBytes) override;
- arrow::Status sortEvict(uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal)
- override;
+ arrow::Status sortEvict(
+ uint32_t partitionId,
+ std::unique_ptr<InMemoryPayload> inMemoryPayload,
+ bool isFinal,
+ int64_t& evictBytes) override;
- arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload>
blockPayload, bool stop) override;
+ arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload>
blockPayload, bool stop, int64_t& evictBytes)
+ override;
arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;
- arrow::Status stop(ShuffleWriterMetrics* metrics) override;
+ arrow::Status stop(ShuffleWriterMetrics* metrics, int64_t& evictBytes)
override;
private:
void init();
- arrow::Status doEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload>
inMemoryPayload);
+ arrow::Status doEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload>
inMemoryPayload, int64_t& evictBytes);
std::shared_ptr<RssPartitionWriterOptions> options_;
std::shared_ptr<RssClient> rssClient_;
diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc
b/cpp/velox/benchmarks/GenericBenchmark.cc
index b3f2747a52..811ca82697 100644
--- a/cpp/velox/benchmarks/GenericBenchmark.cc
+++ b/cpp/velox/benchmarks/GenericBenchmark.cc
@@ -255,7 +255,7 @@ void populateWriterMetrics(
}
metrics.dataSize +=
std::accumulate(shuffleWriter->rawPartitionLengths().begin(),
shuffleWriter->rawPartitionLengths().end(), 0LL);
- metrics.bytesWritten += shuffleWriter->totalBytesWritten();
+ metrics.bytesWritten += shuffleWriter->bytesWritten();
metrics.bytesSpilled += shuffleWriter->totalBytesEvicted();
}
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index dadee31333..e83ce8a566 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -234,6 +234,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>>
VeloxHashShuffleWriter::generateCo
}
arrow::Status VeloxHashShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb,
int64_t memLimit) {
+ writtenBytes_ = 0;
if (partitioning_ == Partitioning::kSingle) {
auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
VELOX_CHECK_NOT_NULL(veloxColumnBatch);
@@ -334,6 +335,7 @@ arrow::Status
VeloxHashShuffleWriter::partitioningAndDoSplit(facebook::velox::Ro
}
arrow::Status VeloxHashShuffleWriter::stop() {
+ writtenBytes_ = 0;
setSplitState(SplitState::kStopEvict);
if (partitioning_ != Partitioning::kSingle) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
@@ -344,7 +346,7 @@ arrow::Status VeloxHashShuffleWriter::stop() {
{
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]);
setSplitState(SplitState::kStop);
- RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
+ RETURN_NOT_OK(partitionWriter_->stop(&metrics_, writtenBytes_));
partitionBuffers_.clear();
}
@@ -966,7 +968,7 @@ arrow::Status VeloxHashShuffleWriter::evictBuffers(
if (!buffers.empty()) {
auto payload =
std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_,
schema_, std::move(buffers), hasComplexType_);
- RETURN_NOT_OK(partitionWriter_->hashEvict(partitionId, std::move(payload),
Evict::kCache, reuseBuffers));
+ RETURN_NOT_OK(partitionWriter_->hashEvict(partitionId, std::move(payload),
Evict::kCache, reuseBuffers, writtenBytes_));
}
return arrow::Status::OK();
}
@@ -1387,7 +1389,7 @@ arrow::Result<int64_t>
VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6
auto payload = std::make_unique<InMemoryPayload>(
item.second, &isValidityBuffer_, schema_, std::move(buffers),
hasComplexType_);
metrics_.totalBytesToEvict += payload->rawSize();
- RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload),
Evict::kSpill, false));
+ RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload),
Evict::kSpill, false, writtenBytes_));
evicted = beforeEvict - partitionBufferPool_->bytes_allocated();
if (evicted >= size) {
break;
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index 61367d9015..c93045c1b5 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -65,6 +65,7 @@ arrow::Status
VeloxRssSortShuffleWriter::doSort(facebook::velox::RowVectorPtr rv
}
arrow::Status VeloxRssSortShuffleWriter::write(std::shared_ptr<ColumnarBatch>
cb, int64_t /* memLimit */) {
+ writtenBytes_ = 0;
if (partitioning_ == Partitioning::kSingle) {
auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
VELOX_CHECK_NOT_NULL(veloxColumnBatch);
@@ -125,7 +126,7 @@ arrow::Status
VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) {
auto arrowBuffer = std::make_shared<arrow::Buffer>(buffer->as<uint8_t>(),
buffer->size());
ARROW_ASSIGN_OR_RAISE(
auto payload, BlockPayload::fromBuffers(Payload::kRaw, 0,
{std::move(arrowBuffer)}, nullptr, nullptr, nullptr));
- RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload),
stopped_));
+ RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload),
stopped_, writtenBytes_));
batch_ =
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(),
serde_.get());
batch_->createStreamTree(rowType_, splitBufferSize_, &serdeOptions_);
return arrow::Status::OK();
@@ -190,6 +191,7 @@ arrow::Status
VeloxRssSortShuffleWriter::evictRowVector(uint32_t partitionId) {
}
arrow::Status VeloxRssSortShuffleWriter::stop() {
+ writtenBytes_ = 0;
stopped_ = true;
for (auto pid = 0; pid < numPartitions(); ++pid) {
RETURN_NOT_OK(evictRowVector(pid));
@@ -199,7 +201,7 @@ arrow::Status VeloxRssSortShuffleWriter::stop() {
{
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]);
setSortState(RssSortState::kSortStop);
- RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
+ RETURN_NOT_OK(partitionWriter_->stop(&metrics_, writtenBytes_));
}
stat();
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 30eb357912..eb5bb89caf 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -73,6 +73,7 @@ VeloxSortShuffleWriter::VeloxSortShuffleWriter(
diskWriteBufferSize_(options->diskWriteBufferSize) {}
arrow::Status VeloxSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb,
int64_t memLimit) {
+ writtenBytes_ = 0;
ARROW_ASSIGN_OR_RAISE(auto rv, getPeeledRowVector(cb));
initRowType(rv);
RETURN_NOT_OK(insert(rv, memLimit));
@@ -81,6 +82,7 @@ arrow::Status
VeloxSortShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, i
arrow::Status VeloxSortShuffleWriter::stop() {
ARROW_RETURN_IF(evictState_ == EvictState::kUnevictable,
arrow::Status::Invalid("Unevictable state in stop."));
+ writtenBytes_ = 0;
stopped_ = true;
if (offset_ > 0) {
@@ -90,7 +92,7 @@ arrow::Status VeloxSortShuffleWriter::stop() {
sortedBuffer_.reset();
pages_.clear();
pageAddresses_.clear();
- RETURN_NOT_OK(partitionWriter_->stop(&metrics_));
+ RETURN_NOT_OK(partitionWriter_->stop(&metrics_, writtenBytes_));
return arrow::Status::OK();
}
@@ -339,7 +341,7 @@ arrow::Status
VeloxSortShuffleWriter::evictPartitionInternal(
nullptr,
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer,
rawLength)});
updateSpillMetrics(payload);
- RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload),
stopped_));
+ RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload),
stopped_, writtenBytes_));
return arrow::Status::OK();
}
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
index eabc975e02..96b2a3fc54 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
@@ -21,7 +21,7 @@ public class GlutenSplitResult {
private final long totalWriteTime;
private final long totalEvictTime;
private final long totalCompressTime; // overlaps with totalEvictTime and
totalWriteTime
- private final long totalBytesWritten;
+ private final long bytesWritten;
private final long totalBytesEvicted;
private final long[] partitionLengths;
private final long[] rawPartitionLengths;
@@ -39,7 +39,7 @@ public class GlutenSplitResult {
long totalCompressTime,
long totalSortTime,
long totalC2RTime,
- long totalBytesWritten,
+ long bytesWritten,
long totalBytesEvicted,
long totalBytesToEvict, // In-memory bytes(uncompressed) before spill.
long peakBytes,
@@ -51,7 +51,7 @@ public class GlutenSplitResult {
this.totalWriteTime = totalWriteTime;
this.totalEvictTime = totalEvictTime;
this.totalCompressTime = totalCompressTime;
- this.totalBytesWritten = totalBytesWritten;
+ this.bytesWritten = bytesWritten;
this.totalBytesEvicted = totalBytesEvicted;
this.partitionLengths = partitionLengths;
this.rawPartitionLengths = rawPartitionLengths;
@@ -79,8 +79,8 @@ public class GlutenSplitResult {
return totalCompressTime;
}
- public long getTotalBytesWritten() {
- return totalBytesWritten;
+ public long getBytesWritten() {
+ return bytesWritten;
}
public long getTotalBytesSpilled() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]