This is an automated email from the ASF dual-hosted git repository.
marin-ma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d2c6f382f6 [VL] Add per-batch input-encoding counter to
VeloxHashShuffleWriter (#12107)
d2c6f382f6 is described below
commit d2c6f382f6213f0e0ef429bdeeb63eba9d904438
Author: Luis Peñaranda <[email protected]>
AuthorDate: Mon May 18 21:24:37 2026 +0200
[VL] Add per-batch input-encoding counter to VeloxHashShuffleWriter (#12107)
---
cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 81 +++++++++
cpp/velox/shuffle/VeloxHashShuffleWriter.h | 76 ++++++++
cpp/velox/tests/CMakeLists.txt | 3 +
.../VeloxHashShuffleWriterInputEncodingTest.cc | 201 +++++++++++++++++++++
4 files changed, 361 insertions(+)
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index f25d4ac887..e071e8a1c3 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -239,6 +239,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>>
VeloxHashShuffleWriter::generateCo
arrow::Status VeloxHashShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb,
int64_t memLimit) {
writtenBytes_ = 0;
+ accumulateInputEncodingCounts(*cb);
if (partitioning_ == Partitioning::kSingle) {
auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
VELOX_CHECK_NOT_NULL(veloxColumnBatch);
@@ -1315,6 +1316,66 @@ uint64_t
VeloxHashShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t fixe
return valueBufferSize;
}
+void VeloxHashShuffleWriter::accumulateInputEncodingCounts(const
ColumnarBatch& cb) {
+ // Only velox-typed batches expose per-child encoding; foreign batches
+ // (e.g. arrow round-trips coming from non-velox sources) will be flattened
+ // by `VeloxColumnarBatch::from` later and we'd undercount, so just skip
+ // them here rather than reporting a misleading "all flat" mix. The skip
+ // counter is exposed via `inputEncodingSkippedBatches()` and printed in
+ // `stat()` so a reader can tell whether a low encoding-bucket total means
+ // "writer saw few children" or "writer saw many but most were not velox".
+ if (cb.getType() != "velox") {
+ ++inputEncodingSkippedBatches_;
+ return;
+ }
+ const auto* veloxBatch = dynamic_cast<const VeloxColumnarBatch*>(&cb);
+ if (veloxBatch == nullptr) {
+ ++inputEncodingSkippedBatches_;
+ return;
+ }
+ const auto& rowVector = veloxBatch->getRowVector();
+ if (rowVector == nullptr) {
+ ++inputEncodingSkippedBatches_;
+ return;
+ }
+ for (const auto& child : rowVector->children()) {
+ if (child == nullptr) {
+ ++inputEncodingCounts_[kInputEncodingOther];
+ continue;
+ }
+ switch (child->encoding()) {
+ case facebook::velox::VectorEncoding::Simple::FLAT:
+ ++inputEncodingCounts_[kInputEncodingFlat];
+ break;
+ case facebook::velox::VectorEncoding::Simple::DICTIONARY:
+ ++inputEncodingCounts_[kInputEncodingDictionary];
+ break;
+ case facebook::velox::VectorEncoding::Simple::CONSTANT:
+ ++inputEncodingCounts_[kInputEncodingConstant];
+ break;
+ case facebook::velox::VectorEncoding::Simple::LAZY:
+ ++inputEncodingCounts_[kInputEncodingLazy];
+ break;
+ case facebook::velox::VectorEncoding::Simple::ROW:
+ case facebook::velox::VectorEncoding::Simple::MAP:
+ case facebook::velox::VectorEncoding::Simple::FLAT_MAP:
+ case facebook::velox::VectorEncoding::Simple::ARRAY:
+ // Struct / map / array column types — first-class in Spark workloads
+ // and exercised by the sibling shuffle writer tests
+ // (`makeArrayVector` / `makeMapVector`). Kept distinct from `Other`
+ // so a reader interpreting the log doesn't conflate a struct column
+ // with a rare encoding.
+ ++inputEncodingCounts_[kInputEncodingComplex];
+ break;
+ default:
+ // BIASED, SEQUENCE, FUNCTION, and any future additions to
+ // `VectorEncoding::Simple`.
+ ++inputEncodingCounts_[kInputEncodingOther];
+ break;
+ }
+ }
+}
+
void VeloxHashShuffleWriter::stat() const {
#if VELOX_SHUFFLE_WRITER_LOG_FLAG
for (int i = CpuWallTimingBegin; i != CpuWallTimingEnd; ++i) {
@@ -1328,6 +1389,26 @@ void VeloxHashShuffleWriter::stat() const {
}
LOG(INFO) << oss.str();
}
+ {
+ std::ostringstream oss;
+ oss << "Velox shuffle writer stat:InputEncoding";
+ int64_t total = 0;
+ for (auto v : inputEncodingCounts_) {
+ total += v;
+ }
+ for (int b = 0; b < kInputEncodingNum; ++b) {
+ auto v = inputEncodingCounts_[b];
+ oss << " " << inputEncodingName(static_cast<InputEncodingBucket>(b)) <<
"=" << v;
+ if (total > 0) {
+ oss << "(" << (100.0 * static_cast<double>(v) /
static_cast<double>(total)) << "%)";
+ }
+ }
+ // Non-velox `write()` calls contribute 0 to the buckets above; expose the
+ // skip count so the denominator (total = sum of buckets) is comparable to
+ // the `count` field in the `cpuWallTimingList_` lines emitted above.
+ oss << " SkippedNonVeloxBatches=" << inputEncodingSkippedBatches_;
+ LOG(INFO) << oss.str();
+ }
#endif
}
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
index 9203c70594..899c2be269 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
@@ -18,6 +18,7 @@
#pragma once
#include <algorithm>
+#include <array>
#include <memory>
#include <string>
#include <vector>
@@ -141,6 +142,69 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
// For test only.
void setPartitionBufferSize(uint32_t newSize) override;
+ // Read-only counters of incoming column-vector encodings observed at the
+ // entry of every `write(...)` call, BEFORE flattening. Mirrors the layout
+ // of `cpuWallTimingList_` (always-on counter, logged via `stat()` when
+ // `VELOX_SHUFFLE_WRITER_LOG_FLAG=1`). Useful as the "what shape did the
+ // writer actually see" companion to `cpuWallTimingList_`'s "where did the
+ // time go" view (post-#12083).
+ //
+ // The 6 buckets map to `facebook::velox::VectorEncoding::Simple` values:
+ // Flat — FLAT
+ // Dictionary — DICTIONARY
+ // Constant — CONSTANT
+ // Lazy — LAZY
+ // Complex — ROW, MAP, FLAT_MAP, ARRAY (struct/map/array column types,
+ // expected in normal Spark workloads — kept distinct from
+ // "Other" so the log doesn't conflate a struct column with
+ // a rare/unknown encoding)
+ // Other — BIASED, SEQUENCE, FUNCTION (rarely seen at the shuffle
+ // writer entry; this bucket is the catch-all for future
+ // additions to VectorEncoding::Simple)
+ enum InputEncodingBucket {
+ kInputEncodingFlat = 0,
+ kInputEncodingDictionary,
+ kInputEncodingConstant,
+ kInputEncodingLazy,
+ kInputEncodingComplex,
+ kInputEncodingOther,
+ kInputEncodingNum,
+ };
+
+ static const char* inputEncodingName(InputEncodingBucket b) {
+ switch (b) {
+ case kInputEncodingFlat:
+ return "Flat";
+ case kInputEncodingDictionary:
+ return "Dictionary";
+ case kInputEncodingConstant:
+ return "Constant";
+ case kInputEncodingLazy:
+ return "Lazy";
+ case kInputEncodingComplex:
+ return "Complex";
+ case kInputEncodingOther:
+ return "Other";
+ default:
+ return "Unknown";
+ }
+ }
+
+ const std::array<int64_t, kInputEncodingNum>& inputEncodingCounts() const {
+ return inputEncodingCounts_;
+ }
+
+ // Count of `write(cb, ...)` calls in which `cb` was not a velox-typed
+ // `ColumnarBatch`. `accumulateInputEncodingCounts` early-returns on those
+ // (it cannot inspect their per-child encodings without forcing an
+ // arrow-bridge round-trip), so they contribute 0 to `inputEncodingCounts_`.
+ // Surfaced separately in the `stat()` log so a reader can tell whether a
+ // low total in the encoding bucket is "writer saw few children" or
+ // "writer saw many children but most were not velox-typed".
+ int64_t inputEncodingSkippedBatches() const {
+ return inputEncodingSkippedBatches_;
+ }
+
// for debugging
void printColumnsInfo() const {
VS_PRINT_FUNCTION_SPLIT_LINE();
@@ -323,6 +387,12 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv,
int64_t memLimit);
+ // Read child-vector encodings of `cb` (no mutation, no flatten) and
+ // accumulate into `inputEncodingCounts_`. Called once per `write()` call,
+ // BEFORE any `getFlattenedRowVector()` invocation, so the counts reflect
+ // the encoding the writer actually receives — not the post-flatten shape.
+ void accumulateInputEncodingCounts(const ColumnarBatch& cb);
+
protected:
int32_t splitBufferSize_;
double splitBufferReallocThreshold_;
@@ -436,6 +506,12 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
std::optional<uint32_t> partitionBufferInUse_{std::nullopt};
std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_;
+
+ // See InputEncodingBucket / inputEncodingCounts() above.
+ std::array<int64_t, kInputEncodingNum> inputEncodingCounts_{};
+
+ // See inputEncodingSkippedBatches() above.
+ int64_t inputEncodingSkippedBatches_{0};
}; // class VeloxHashBasedShuffleWriter
} // namespace gluten
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index 0cd9e38bc6..00c0c2df69 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -100,6 +100,9 @@ set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc
FilePathGenerator.cc)
add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc)
+add_velox_test(velox_hash_shuffle_writer_input_encoding_test SOURCES
+ VeloxHashShuffleWriterInputEncodingTest.cc)
+
add_velox_test(velox_shuffle_writer_spill_test SOURCES
VeloxShuffleWriterSpillTest.cc)
diff --git a/cpp/velox/tests/VeloxHashShuffleWriterInputEncodingTest.cc
b/cpp/velox/tests/VeloxHashShuffleWriterInputEncodingTest.cc
new file mode 100644
index 0000000000..8501fc7ecf
--- /dev/null
+++ b/cpp/velox/tests/VeloxHashShuffleWriterInputEncodingTest.cc
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "shuffle/VeloxHashShuffleWriter.h"
+
+#include "VeloxShuffleWriterTestBase.h"
+#include "utils/Macros.h"
+#include "utils/TestUtils.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
+
+namespace gluten {
+
+namespace {
+
+std::shared_ptr<PartitionWriter> makeLocalPartitionWriter(
+ uint32_t numPartitions,
+ const std::string& dataFile,
+ const std::vector<std::string>& localDirs) {
+ GLUTEN_ASSIGN_OR_THROW(auto codec,
arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME));
+ auto options = std::make_shared<LocalPartitionWriterOptions>();
+ return std::make_shared<LocalPartitionWriter>(
+ numPartitions, std::move(codec), getDefaultMemoryManager(), options,
dataFile, localDirs);
+}
+
+} // namespace
+
+class HashShuffleWriterInputEncodingTest : public ::testing::Test, public
VeloxShuffleWriterTestBase {
+ protected:
+ static void SetUpTestSuite() {
+ setUpVeloxBackend();
+ }
+
+ static void TearDownTestSuite() {
+ tearDownVeloxBackend();
+ }
+
+ void SetUp() override {
+ VeloxShuffleWriterTestBase::setUpTestData();
+ }
+
+ std::shared_ptr<VeloxHashShuffleWriter> createWriter(uint32_t numPartitions)
{
+ auto options = std::make_shared<HashShuffleWriterOptions>();
+ options->partitioning = Partitioning::kHash;
+ options->splitBufferSize = 4096;
+
+ auto partitionWriter = makeLocalPartitionWriter(numPartitions, dataFile_,
localDirs_);
+
+ GLUTEN_ASSIGN_OR_THROW(
+ auto base,
+ VeloxShuffleWriter::create(
+ ShuffleWriterType::kHashShuffle, numPartitions, partitionWriter,
options, getDefaultMemoryManager()));
+ return std::dynamic_pointer_cast<VeloxHashShuffleWriter>(base);
+ }
+
+ // Wrap a RowVector into a VeloxColumnarBatch and feed it to the writer
+ // (no flatten before the call — the encoding counts are captured on the
+ // batch as-is).
+ arrow::Status writeBatch(VeloxHashShuffleWriter& writer,
facebook::velox::RowVectorPtr rv) {
+ std::shared_ptr<ColumnarBatch> cb =
std::make_shared<VeloxColumnarBatch>(rv);
+ return writer.write(cb, ShuffleWriter::kMinMemLimit);
+ }
+};
+
+// All-flat input: every child increments the FLAT bucket, other buckets stay
0.
+TEST_F(HashShuffleWriterInputEncodingTest, allFlat) {
+ auto writer = createWriter(2);
+ ASSERT_NE(writer, nullptr);
+
+ // Two batches with 3 flat children each (first is the partition-key column
+ // required by hash partitioning).
+ for (int i = 0; i < 2; ++i) {
+ auto rv = makeRowVector({
+ makeFlatVector<int32_t>({0, 1, 0, 1}),
+ makeFlatVector<int32_t>({10, 20, 30, 40}),
+ makeFlatVector<int64_t>({100, 200, 300, 400}),
+ });
+ ASSERT_NOT_OK(writeBatch(*writer, rv));
+ }
+
+ const auto& counts = writer->inputEncodingCounts();
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingFlat], 6);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingDictionary], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingConstant], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingLazy], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingComplex], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingOther], 0);
+ EXPECT_EQ(writer->inputEncodingSkippedBatches(), 0);
+}
+
+// Mixed flat + dictionary + constant in a single batch: one increment per
bucket.
+TEST_F(HashShuffleWriterInputEncodingTest, mixedFlatDictConst) {
+ auto writer = createWriter(2);
+ ASSERT_NE(writer, nullptr);
+
+ // dict-encoded VARCHAR child
+ auto dictBase = makeFlatVector<facebook::velox::StringView>({"a", "b", "c",
"d"});
+ auto indices = makeIndices({0, 1, 2, 3});
+ auto dictChild = facebook::velox::BaseVector::wrapInDictionary(nullptr,
indices, 4, dictBase);
+
+ // constant int32 child
+ auto constChild = facebook::velox::BaseVector::createConstant(
+ facebook::velox::INTEGER(), facebook::velox::variant(int32_t{42}), 4,
pool());
+
+ auto rv = makeRowVector({
+ makeFlatVector<int32_t>({0, 1, 0, 1}), // partition key (FLAT)
+ makeFlatVector<int32_t>({10, 20, 30, 40}), // FLAT data
+ dictChild, // DICTIONARY
+ constChild, // CONSTANT
+ });
+ ASSERT_NOT_OK(writeBatch(*writer, rv));
+
+ const auto& counts = writer->inputEncodingCounts();
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingFlat], 2);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingDictionary], 1);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingConstant], 1);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingLazy], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingComplex], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingOther], 0);
+ EXPECT_EQ(writer->inputEncodingSkippedBatches(), 0);
+}
+
+// Lazy child should land in the LAZY bucket (encoding is reported before
+// any loadedVector() call).
+TEST_F(HashShuffleWriterInputEncodingTest, lazy) {
+ auto writer = createWriter(2);
+ ASSERT_NE(writer, nullptr);
+
+ auto lazyChild = std::make_shared<facebook::velox::LazyVector>(
+ pool(),
+ facebook::velox::BIGINT(),
+ 4,
+
std::make_unique<facebook::velox::test::SimpleVectorLoader>([&](facebook::velox::RowSet
/*rows*/) {
+ return makeFlatVector<int64_t>({100, 200, 300, 400});
+ }));
+
+ auto rv = makeRowVector({
+ makeFlatVector<int32_t>({0, 1, 0, 1}), // partition key
+ lazyChild,
+ });
+ ASSERT_NOT_OK(writeBatch(*writer, rv));
+
+ const auto& counts = writer->inputEncodingCounts();
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingFlat], 1);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingLazy], 1);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingDictionary], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingConstant], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingComplex], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingOther], 0);
+ EXPECT_EQ(writer->inputEncodingSkippedBatches(), 0);
+}
+
+// ARRAY / MAP / ROW children land in the COMPLEX bucket, not OTHER. The
sibling
+// `VeloxShuffleWriterTest` exercises these via `makeArrayVector` /
`makeMapVector`,
+// so this is the typical-Spark-workload case, not an edge case.
+TEST_F(HashShuffleWriterInputEncodingTest, complex) {
+ auto writer = createWriter(2);
+ ASSERT_NE(writer, nullptr);
+
+ auto arrayChild = makeArrayVector<int64_t>({{1, 2}, {3, 4}, {5}, {6, 7, 8}});
+ auto mapChild = makeMapVector<int32_t, facebook::velox::StringView>(
+ {{{1, "a"}, {2, "b"}}, {{3, "c"}}, {{4, "d"}, {5, "e"}}, {{6, "f"}}});
+
+ auto rv = makeRowVector({
+ makeFlatVector<int32_t>({0, 1, 0, 1}), // partition key (FLAT)
+ arrayChild, // ARRAY
+ mapChild, // MAP
+ });
+ ASSERT_NOT_OK(writeBatch(*writer, rv));
+
+ const auto& counts = writer->inputEncodingCounts();
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingFlat], 1);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingComplex], 2);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingDictionary], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingConstant], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingLazy], 0);
+ EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingOther], 0);
+ EXPECT_EQ(writer->inputEncodingSkippedBatches(), 0);
+}
+
+} // namespace gluten
+
+int main(int argc, char** argv) {
+ testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]