This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 061efb1bf [VL] Fix shuffle spill not reported to spark metric (#6740)
061efb1bf is described below
commit 061efb1bf577493e6a733353e4bbd21e5a5420a1
Author: Rong Ma <[email protected]>
AuthorDate: Fri Aug 9 10:08:51 2024 +0800
[VL] Fix shuffle spill not reported to spark metric (#6740)
---
cpp/core/jni/JniWrapper.cc | 3 ++-
cpp/core/shuffle/LocalPartitionWriter.cc | 18 +++++++++++++-----
cpp/core/shuffle/LocalPartitionWriter.h | 1 +
cpp/core/shuffle/Options.h | 1 +
cpp/core/shuffle/Payload.cc | 20 +++++++-------------
cpp/core/shuffle/Payload.h | 18 ++++++++----------
cpp/core/shuffle/ShuffleWriter.cc | 4 ++++
cpp/core/shuffle/ShuffleWriter.h | 2 ++
cpp/core/shuffle/Spill.cc | 2 +-
cpp/core/shuffle/Spill.h | 2 +-
cpp/core/shuffle/rss/RssPartitionWriter.cc | 2 +-
cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 1 +
cpp/velox/shuffle/VeloxSortShuffleWriter.cc | 8 ++++++++
cpp/velox/shuffle/VeloxSortShuffleWriter.h | 2 ++
.../apache/gluten/vectorized/GlutenSplitResult.java | 7 +++++++
.../apache/spark/shuffle/ColumnarShuffleWriter.scala | 2 ++
16 files changed, 61 insertions(+), 32 deletions(-)
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 3e583f20b..5c2752f18 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -165,7 +165,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
jniByteInputStreamClose = getMethodIdOrError(env, jniByteInputStreamClass,
"close", "()V");
splitResultClass = createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/GlutenSplitResult;");
- splitResultConstructor = getMethodIdOrError(env, splitResultClass, "<init>",
"(JJJJJJJJJ[J[J)V");
+ splitResultConstructor = getMethodIdOrError(env, splitResultClass, "<init>",
"(JJJJJJJJJJ[J[J)V");
columnarBatchSerializeResultClass =
createGlobalClassReferenceOrError(env,
"Lorg/apache/gluten/vectorized/ColumnarBatchSerializeResult;");
@@ -975,6 +975,7 @@ JNIEXPORT jobject JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
shuffleWriter->totalC2RTime(),
shuffleWriter->totalBytesWritten(),
shuffleWriter->totalBytesEvicted(),
+ shuffleWriter->totalBytesToEvict(),
shuffleWriter->peakBytesAllocated(),
partitionLengthArr,
rawPartitionLengthArr);
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc
b/cpp/core/shuffle/LocalPartitionWriter.cc
index 031be791b..fe206b488 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -183,12 +183,15 @@ class LocalPartitionWriter::PayloadMerger {
return merged;
}
- arrow::Result<std::optional<std::unique_ptr<BlockPayload>>>
finishForSpill(uint32_t partitionId) {
+ arrow::Result<std::optional<std::unique_ptr<BlockPayload>>> finishForSpill(
+ uint32_t partitionId,
+ int64_t& totalBytesToEvict) {
// We need to check whether the spill source is from compressing/copying
the merged buffers.
if ((partitionInMerge_.has_value() && *partitionInMerge_ == partitionId)
|| !hasMerged(partitionId)) {
return std::nullopt;
}
auto payload = std::move(partitionMergePayload_[partitionId]);
+ totalBytesToEvict += payload->rawSize();
return payload->toBlockPayload(Payload::kUncompressed, pool_, codec_);
}
@@ -312,7 +315,8 @@ class LocalPartitionWriter::PayloadCache {
std::shared_ptr<arrow::io::OutputStream> os,
const std::string& spillFile,
arrow::MemoryPool* pool,
- arrow::util::Codec* codec) {
+ arrow::util::Codec* codec,
+ int64_t& totalBytesToEvict) {
std::shared_ptr<Spill> diskSpill = nullptr;
ARROW_ASSIGN_OR_RAISE(auto start, os->Tell());
for (uint32_t pid = 0; pid < numPartitions_; ++pid) {
@@ -321,6 +325,7 @@ class LocalPartitionWriter::PayloadCache {
while (!payloads.empty()) {
auto payload = std::move(payloads.front());
payloads.pop_front();
+ totalBytesToEvict += payload->rawSize();
// Spill the cached payload to disk.
RETURN_NOT_OK(payload->serialize(os.get()));
compressTime_ += payload->getCompressTime();
@@ -550,7 +555,7 @@ arrow::Status LocalPartitionWriter::evict(
bool reuseBuffers,
bool hasComplexType,
bool isFinal) {
- rawPartitionLengths_[partitionId] += inMemoryPayload->getBufferSize();
+ rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
if (evictType == Evict::kSortSpill) {
if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal &&
!dataFileOs_))) {
@@ -604,6 +609,7 @@ arrow::Status LocalPartitionWriter::evict(
return arrow::Status::OK();
}
+// FIXME: Remove this code path for local partition writer.
arrow::Status LocalPartitionWriter::evict(uint32_t partitionId,
std::unique_ptr<BlockPayload> blockPayload, bool stop) {
rawPartitionLengths_[partitionId] += blockPayload->rawSize();
@@ -642,7 +648,8 @@ arrow::Status
LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
ARROW_ASSIGN_OR_RAISE(auto os,
arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
spills_.emplace_back();
ARROW_ASSIGN_OR_RAISE(
- spills_.back(), payloadCache_->spillAndClose(os, spillFile,
payloadPool_.get(), codec_.get()));
+ spills_.back(),
+ payloadCache_->spillAndClose(os, spillFile, payloadPool_.get(),
codec_.get(), totalBytesToEvict_));
reclaimed += beforeSpill - payloadPool_->bytes_allocated();
if (reclaimed >= size) {
*actual = reclaimed;
@@ -653,7 +660,7 @@ arrow::Status
LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
if (merger_) {
auto beforeSpill = payloadPool_->bytes_allocated();
for (auto pid = 0; pid < numPartitions_; ++pid) {
- ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finishForSpill(pid));
+ ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finishForSpill(pid,
totalBytesToEvict_));
if (merged.has_value()) {
RETURN_NOT_OK(requestSpill(false));
RETURN_NOT_OK(spiller_->spill(pid, std::move(*merged)));
@@ -678,6 +685,7 @@ arrow::Status
LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metric
metrics->totalCompressTime += compressTime_;
metrics->totalEvictTime += spillTime_;
metrics->totalWriteTime += writeTime_;
+ metrics->totalBytesToEvict += totalBytesToEvict_;
metrics->totalBytesEvicted += totalBytesEvicted_;
metrics->totalBytesWritten += std::filesystem::file_size(dataFile_);
metrics->partitionLengths = std::move(partitionLengths_);
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h
b/cpp/core/shuffle/LocalPartitionWriter.h
index efd7b4df3..555632fed 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -110,6 +110,7 @@ class LocalPartitionWriter : public PartitionWriter {
std::vector<int32_t> subDirSelection_;
std::shared_ptr<arrow::io::OutputStream> dataFileOs_;
+ int64_t totalBytesToEvict_{0};
int64_t totalBytesEvicted_{0};
std::vector<int64_t> partitionLengths_;
std::vector<int64_t> rawPartitionLengths_;
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 11fa037eb..2424ec557 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -87,6 +87,7 @@ struct PartitionWriterOptions {
struct ShuffleWriterMetrics {
int64_t totalBytesWritten{0};
int64_t totalBytesEvicted{0};
+ int64_t totalBytesToEvict{0};
int64_t totalWriteTime{0};
int64_t totalEvictTime{0};
int64_t totalCompressTime{0};
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index b8d8274cb..d0c24e4bc 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -325,9 +325,8 @@ void BlockPayload::setCompressionTime(int64_t
compressionTime) {
compressTime_ = compressionTime;
}
-uint64_t BlockPayload::rawSize() {
- return std::accumulate(
- buffers_.begin(), buffers_.end(), 0UL, [](auto sum, const auto& buffer)
{ return sum + buffer->size(); });
+int64_t BlockPayload::rawSize() {
+ return getBufferSize(buffers_);
}
arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge(
@@ -418,10 +417,6 @@ arrow::Result<std::shared_ptr<arrow::Buffer>>
InMemoryPayload::readBufferAt(uint
return std::move(buffers_[index]);
}
-int64_t InMemoryPayload::getBufferSize() const {
- return gluten::getBufferSize(buffers_);
-}
-
arrow::Status InMemoryPayload::copyBuffers(arrow::MemoryPool* pool) {
for (auto& buffer : buffers_) {
if (!buffer) {
@@ -438,9 +433,8 @@ arrow::Status
InMemoryPayload::copyBuffers(arrow::MemoryPool* pool) {
return arrow::Status::OK();
}
-uint64_t InMemoryPayload::rawSize() {
- return std::accumulate(
- buffers_.begin(), buffers_.end(), 0UL, [](auto sum, const auto& buffer)
{ return sum + buffer->size(); });
+int64_t InMemoryPayload::rawSize() {
+ return getBufferSize(buffers_);
}
UncompressedDiskBlockPayload::UncompressedDiskBlockPayload(
@@ -513,7 +507,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>>
UncompressedDiskBlockPayload::read
return buffer;
}
-uint64_t UncompressedDiskBlockPayload::rawSize() {
+int64_t UncompressedDiskBlockPayload::rawSize() {
return rawSize_;
}
@@ -521,7 +515,7 @@ CompressedDiskBlockPayload::CompressedDiskBlockPayload(
uint32_t numRows,
const std::vector<bool>* isValidityBuffer,
arrow::io::InputStream*& inputStream,
- uint64_t rawSize,
+ int64_t rawSize,
arrow::MemoryPool* /* pool */)
: Payload(Type::kCompressed, numRows, isValidityBuffer),
inputStream_(inputStream), rawSize_(rawSize) {}
@@ -536,7 +530,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>>
CompressedDiskBlockPayload::readBu
return arrow::Status::Invalid("Cannot read buffer from
CompressedDiskBlockPayload.");
}
-uint64_t CompressedDiskBlockPayload::rawSize() {
+int64_t CompressedDiskBlockPayload::rawSize() {
return rawSize_;
}
} // namespace gluten
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index 0a317d9c3..1bd8815a4 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -38,7 +38,7 @@ class Payload {
virtual arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t
index) = 0;
- virtual uint64_t rawSize() = 0;
+ virtual int64_t rawSize() = 0;
int64_t getCompressTime() const {
return compressTime_;
@@ -97,7 +97,7 @@ class BlockPayload final : public Payload {
arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t pos)
override;
- uint64_t rawSize() override;
+ int64_t rawSize() override;
protected:
BlockPayload(
@@ -134,11 +134,9 @@ class InMemoryPayload final : public Payload {
arrow::Result<std::unique_ptr<BlockPayload>>
toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool,
arrow::util::Codec* codec);
- int64_t getBufferSize() const;
-
arrow::Status copyBuffers(arrow::MemoryPool* pool);
- uint64_t rawSize() override;
+ int64_t rawSize() override;
private:
std::vector<std::shared_ptr<arrow::Buffer>> buffers_;
@@ -159,11 +157,11 @@ class UncompressedDiskBlockPayload final : public Payload
{
arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
- uint64_t rawSize() override;
+ int64_t rawSize() override;
private:
arrow::io::InputStream*& inputStream_;
- uint64_t rawSize_;
+ int64_t rawSize_;
arrow::MemoryPool* pool_;
arrow::util::Codec* codec_;
uint32_t readPos_{0};
@@ -177,17 +175,17 @@ class CompressedDiskBlockPayload final : public Payload {
uint32_t numRows,
const std::vector<bool>* isValidityBuffer,
arrow::io::InputStream*& inputStream,
- uint64_t rawSize,
+ int64_t rawSize,
arrow::MemoryPool* pool);
arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index)
override;
- uint64_t rawSize() override;
+ int64_t rawSize() override;
private:
arrow::io::InputStream*& inputStream_;
- uint64_t rawSize_;
+ int64_t rawSize_;
};
} // namespace gluten
diff --git a/cpp/core/shuffle/ShuffleWriter.cc
b/cpp/core/shuffle/ShuffleWriter.cc
index e637d37ff..3eff9a2c8 100644
--- a/cpp/core/shuffle/ShuffleWriter.cc
+++ b/cpp/core/shuffle/ShuffleWriter.cc
@@ -55,6 +55,10 @@ int64_t ShuffleWriter::totalBytesEvicted() const {
return metrics_.totalBytesEvicted;
}
+int64_t ShuffleWriter::totalBytesToEvict() const {
+ return metrics_.totalBytesToEvict;
+}
+
int64_t ShuffleWriter::totalWriteTime() const {
return metrics_.totalWriteTime;
}
diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h
index 661112150..8c79829e0 100644
--- a/cpp/core/shuffle/ShuffleWriter.h
+++ b/cpp/core/shuffle/ShuffleWriter.h
@@ -52,6 +52,8 @@ class ShuffleWriter : public Reclaimable {
int64_t totalBytesEvicted() const;
+ int64_t totalBytesToEvict() const;
+
int64_t totalWriteTime() const;
int64_t totalEvictTime() const;
diff --git a/cpp/core/shuffle/Spill.cc b/cpp/core/shuffle/Spill.cc
index 0bbe667ab..d8b9bc7eb 100644
--- a/cpp/core/shuffle/Spill.cc
+++ b/cpp/core/shuffle/Spill.cc
@@ -48,7 +48,7 @@ void Spill::insertPayload(
Payload::Type payloadType,
uint32_t numRows,
const std::vector<bool>* isValidityBuffer,
- uint64_t rawSize,
+ int64_t rawSize,
arrow::MemoryPool* pool,
arrow::util::Codec* codec) {
// TODO: Add compression threshold.
diff --git a/cpp/core/shuffle/Spill.h b/cpp/core/shuffle/Spill.h
index 7ee60ef29..9d8d24087 100644
--- a/cpp/core/shuffle/Spill.h
+++ b/cpp/core/shuffle/Spill.h
@@ -46,7 +46,7 @@ class Spill final {
Payload::Type payloadType,
uint32_t numRows,
const std::vector<bool>* isValidityBuffer,
- uint64_t rawSize,
+ int64_t rawSize,
arrow::MemoryPool* pool,
arrow::util::Codec* codec);
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc
b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 19f178a2c..8f75f9993 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -56,7 +56,7 @@ arrow::Status RssPartitionWriter::evict(
bool reuseBuffers,
bool hasComplexType,
bool isFinal) {
- rawPartitionLengths_[partitionId] += inMemoryPayload->getBufferSize();
+ rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
auto payloadType = codec_ ? Payload::Type::kCompressed :
Payload::Type::kUncompressed;
ARROW_ASSIGN_OR_RAISE(
auto payload, inMemoryPayload->toBlockPayload(payloadType,
payloadPool_.get(), codec_ ? codec_.get() : nullptr));
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index e165d4a91..00d8be166 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -1364,6 +1364,7 @@ arrow::Result<int64_t>
VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6
auto pid = item.first;
ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false));
auto payload = std::make_unique<InMemoryPayload>(item.second,
&isValidityBuffer_, std::move(buffers));
+ metrics_.totalBytesToEvict += payload->rawSize();
RETURN_NOT_OK(partitionWriter_->evict(pid, std::move(payload),
Evict::kSpill, false, hasComplexType_, false));
evicted = beforeEvict - partitionBufferPool_->bytes_allocated();
if (evicted >= size) {
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index d15280c0a..7f3e9201f 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -283,6 +283,7 @@ arrow::Status
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
index - begin,
nullptr,
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
offset)});
+ updateSpillMetrics(payload);
RETURN_NOT_OK(
partitionWriter_->evict(partitionId, std::move(payload),
Evict::type::kSortSpill, false, false, stopped_));
begin = index;
@@ -296,6 +297,7 @@ arrow::Status
VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
end - begin,
nullptr,
std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_,
offset)});
+ updateSpillMetrics(payload);
RETURN_NOT_OK(
partitionWriter_->evict(partitionId, std::move(payload),
Evict::type::kSortSpill, false, false, stopped_));
return arrow::Status::OK();
@@ -399,4 +401,10 @@ void VeloxSortShuffleWriter::allocateMinimalArray() {
options_.sortBufferInitialSize * sizeof(uint64_t), veloxPool_.get());
setUpArray(std::move(array));
}
+
+void VeloxSortShuffleWriter::updateSpillMetrics(const
std::unique_ptr<InMemoryPayload>& payload) {
+ if (!stopped_) {
+ metrics_.totalBytesToEvict += payload->rawSize();
+ }
+}
} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 2925b85f8..34fbfd243 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -89,6 +89,8 @@ class VeloxSortShuffleWriter final : public
VeloxShuffleWriter {
void allocateMinimalArray();
+ void updateSpillMetrics(const std::unique_ptr<InMemoryPayload>& payload);
+
// Stores compact row id -> row
facebook::velox::BufferPtr array_;
uint64_t* arrayPtr_;
diff --git
a/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
b/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
index dbc0d7db5..3bed6ac79 100644
---
a/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
+++
b/gluten-data/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java
@@ -17,6 +17,7 @@
package org.apache.gluten.vectorized;
public class GlutenSplitResult extends SplitResult {
+ private final long bytesToEvict;
private final long peakBytes;
private final long sortTime;
private final long c2rTime;
@@ -30,6 +31,7 @@ public class GlutenSplitResult extends SplitResult {
long totalC2RTime,
long totalBytesWritten,
long totalBytesEvicted,
+ long totalBytesToEvict, // In-memory bytes(uncompressed) before spill.
long peakBytes,
long[] partitionLengths,
long[] rawPartitionLengths) {
@@ -42,11 +44,16 @@ public class GlutenSplitResult extends SplitResult {
totalBytesEvicted,
partitionLengths,
rawPartitionLengths);
+ this.bytesToEvict = totalBytesToEvict;
this.peakBytes = peakBytes;
this.sortTime = totalSortTime;
this.c2rTime = totalC2RTime;
}
+ public long getBytesToEvict() {
+ return bytesToEvict;
+ }
+
public long getPeakBytes() {
return peakBytes;
}
diff --git
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 08535a393..251bb977f 100644
---
a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++
b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -210,6 +210,8 @@ class ColumnarShuffleWriter[K, V](
dep.metrics("peakBytes").add(splitResult.getPeakBytes)
writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten)
writeMetrics.incWriteTime(splitResult.getTotalWriteTime +
splitResult.getTotalSpillTime)
+
taskContext.taskMetrics().incMemoryBytesSpilled(splitResult.getBytesToEvict)
+
taskContext.taskMetrics().incDiskBytesSpilled(splitResult.getTotalBytesSpilled)
partitionLengths = splitResult.getPartitionLengths
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]