This is an automated email from the ASF dual-hosted git repository.

marin-ma pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d2c6f382f6 [VL] Add per-batch input-encoding counter to 
VeloxHashShuffleWriter (#12107)
d2c6f382f6 is described below

commit d2c6f382f6213f0e0ef429bdeeb63eba9d904438
Author: Luis Peñaranda <[email protected]>
AuthorDate: Mon May 18 21:24:37 2026 +0200

    [VL] Add per-batch input-encoding counter to VeloxHashShuffleWriter (#12107)
---
 cpp/velox/shuffle/VeloxHashShuffleWriter.cc        |  81 +++++++++
 cpp/velox/shuffle/VeloxHashShuffleWriter.h         |  76 ++++++++
 cpp/velox/tests/CMakeLists.txt                     |   3 +
 .../VeloxHashShuffleWriterInputEncodingTest.cc     | 201 +++++++++++++++++++++
 4 files changed, 361 insertions(+)

diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index f25d4ac887..e071e8a1c3 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -239,6 +239,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> 
VeloxHashShuffleWriter::generateCo
 
 arrow::Status VeloxHashShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, 
int64_t memLimit) {
   writtenBytes_ = 0;
+  accumulateInputEncodingCounts(*cb);
   if (partitioning_ == Partitioning::kSingle) {
     auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
     VELOX_CHECK_NOT_NULL(veloxColumnBatch);
@@ -1315,6 +1316,66 @@ uint64_t 
VeloxHashShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t fixe
   return valueBufferSize;
 }
 
+void VeloxHashShuffleWriter::accumulateInputEncodingCounts(const 
ColumnarBatch& cb) {
+  // Only velox-typed batches expose per-child encoding; foreign batches
+  // (e.g. arrow round-trips coming from non-velox sources) will be flattened
+  // by `VeloxColumnarBatch::from` later and we'd undercount, so just skip
+  // them here rather than reporting a misleading "all flat" mix. The skip
+  // counter is exposed via `inputEncodingSkippedBatches()` and printed in
+  // `stat()` so a reader can tell whether a low encoding-bucket total means
+  // "writer saw few children" or "writer saw many but most were not velox".
+  if (cb.getType() != "velox") {
+    ++inputEncodingSkippedBatches_;
+    return;
+  }
+  const auto* veloxBatch = dynamic_cast<const VeloxColumnarBatch*>(&cb);
+  if (veloxBatch == nullptr) {
+    ++inputEncodingSkippedBatches_;
+    return;
+  }
+  const auto& rowVector = veloxBatch->getRowVector();
+  if (rowVector == nullptr) {
+    ++inputEncodingSkippedBatches_;
+    return;
+  }
+  for (const auto& child : rowVector->children()) {
+    if (child == nullptr) {
+      ++inputEncodingCounts_[kInputEncodingOther];
+      continue;
+    }
+    switch (child->encoding()) {
+      case facebook::velox::VectorEncoding::Simple::FLAT:
+        ++inputEncodingCounts_[kInputEncodingFlat];
+        break;
+      case facebook::velox::VectorEncoding::Simple::DICTIONARY:
+        ++inputEncodingCounts_[kInputEncodingDictionary];
+        break;
+      case facebook::velox::VectorEncoding::Simple::CONSTANT:
+        ++inputEncodingCounts_[kInputEncodingConstant];
+        break;
+      case facebook::velox::VectorEncoding::Simple::LAZY:
+        ++inputEncodingCounts_[kInputEncodingLazy];
+        break;
+      case facebook::velox::VectorEncoding::Simple::ROW:
+      case facebook::velox::VectorEncoding::Simple::MAP:
+      case facebook::velox::VectorEncoding::Simple::FLAT_MAP:
+      case facebook::velox::VectorEncoding::Simple::ARRAY:
+        // Struct / map / array column types — first-class in Spark workloads
+        // and exercised by the sibling shuffle writer tests
+        // (`makeArrayVector` / `makeMapVector`). Kept distinct from `Other`
+        // so a reader interpreting the log doesn't conflate a struct column
+        // with a rare encoding.
+        ++inputEncodingCounts_[kInputEncodingComplex];
+        break;
+      default:
+        // BIASED, SEQUENCE, FUNCTION, and any future additions to
+        // `VectorEncoding::Simple`.
+        ++inputEncodingCounts_[kInputEncodingOther];
+        break;
+    }
+  }
+}
+
 void VeloxHashShuffleWriter::stat() const {
 #if VELOX_SHUFFLE_WRITER_LOG_FLAG
   for (int i = CpuWallTimingBegin; i != CpuWallTimingEnd; ++i) {
@@ -1328,6 +1389,26 @@ void VeloxHashShuffleWriter::stat() const {
     }
     LOG(INFO) << oss.str();
   }
+  {
+    std::ostringstream oss;
+    oss << "Velox shuffle writer stat:InputEncoding";
+    int64_t total = 0;
+    for (auto v : inputEncodingCounts_) {
+      total += v;
+    }
+    for (int b = 0; b < kInputEncodingNum; ++b) {
+      auto v = inputEncodingCounts_[b];
+      oss << " " << inputEncodingName(static_cast<InputEncodingBucket>(b)) << 
"=" << v;
+      if (total > 0) {
+        oss << "(" << (100.0 * static_cast<double>(v) / 
static_cast<double>(total)) << "%)";
+      }
+    }
+    // Non-velox `write()` calls contribute 0 to the buckets above; expose the
+    // skip count so the denominator (total = sum of buckets) is comparable to
+    // the `count` field in the `cpuWallTimingList_` lines emitted above.
+    oss << " SkippedNonVeloxBatches=" << inputEncodingSkippedBatches_;
+    LOG(INFO) << oss.str();
+  }
 #endif
 }
 
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h 
b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
index 9203c70594..899c2be269 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <algorithm>
+#include <array>
 #include <memory>
 #include <string>
 #include <vector>
@@ -141,6 +142,69 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
   // For test only.
   void setPartitionBufferSize(uint32_t newSize) override;
 
+  // Read-only counters of incoming column-vector encodings observed at the
+  // entry of every `write(...)` call, BEFORE flattening. Mirrors the layout
+  // of `cpuWallTimingList_` (always-on counter, logged via `stat()` when
+  // `VELOX_SHUFFLE_WRITER_LOG_FLAG=1`). Useful as the "what shape did the
+  // writer actually see" companion to `cpuWallTimingList_`'s "where did the
+  // time go" view (post-#12083).
+  //
+  // The 6 buckets map to `facebook::velox::VectorEncoding::Simple` values:
+  //   Flat       — FLAT
+  //   Dictionary — DICTIONARY
+  //   Constant   — CONSTANT
+  //   Lazy       — LAZY
+  //   Complex    — ROW, MAP, FLAT_MAP, ARRAY  (struct/map/array column types,
+  //                expected in normal Spark workloads — kept distinct from
+  //                "Other" so the log doesn't conflate a struct column with
+  //                a rare/unknown encoding)
+  //   Other      — BIASED, SEQUENCE, FUNCTION  (rarely seen at the shuffle
+  //                writer entry; this bucket is the catch-all for future
+  //                additions to VectorEncoding::Simple)
+  enum InputEncodingBucket {
+    kInputEncodingFlat = 0,
+    kInputEncodingDictionary,
+    kInputEncodingConstant,
+    kInputEncodingLazy,
+    kInputEncodingComplex,
+    kInputEncodingOther,
+    kInputEncodingNum,
+  };
+
+  static const char* inputEncodingName(InputEncodingBucket b) {
+    switch (b) {
+      case kInputEncodingFlat:
+        return "Flat";
+      case kInputEncodingDictionary:
+        return "Dictionary";
+      case kInputEncodingConstant:
+        return "Constant";
+      case kInputEncodingLazy:
+        return "Lazy";
+      case kInputEncodingComplex:
+        return "Complex";
+      case kInputEncodingOther:
+        return "Other";
+      default:
+        return "Unknown";
+    }
+  }
+
+  const std::array<int64_t, kInputEncodingNum>& inputEncodingCounts() const {
+    return inputEncodingCounts_;
+  }
+
+  // Count of `write(cb, ...)` calls in which `cb` was not a velox-typed
+  // `ColumnarBatch`. `accumulateInputEncodingCounts` early-returns on those
+  // (it cannot inspect their per-child encodings without forcing an
+  // arrow-bridge round-trip), so they contribute 0 to `inputEncodingCounts_`.
+  // Surfaced separately in the `stat()` log so a reader can tell whether a
+  // low total in the encoding bucket is "writer saw few children" or
+  // "writer saw many children but most were not velox-typed".
+  int64_t inputEncodingSkippedBatches() const {
+    return inputEncodingSkippedBatches_;
+  }
+
   // for debugging
   void printColumnsInfo() const {
     VS_PRINT_FUNCTION_SPLIT_LINE();
@@ -323,6 +387,12 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
 
   arrow::Status partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, 
int64_t memLimit);
 
+  // Read child-vector encodings of `cb` (no mutation, no flatten) and
+  // accumulate into `inputEncodingCounts_`. Called once per `write()` call,
+  // BEFORE any `getFlattenedRowVector()` invocation, so the counts reflect
+  // the encoding the writer actually receives — not the post-flatten shape.
+  void accumulateInputEncodingCounts(const ColumnarBatch& cb);
+
  protected:
   int32_t splitBufferSize_;
   double splitBufferReallocThreshold_;
@@ -436,6 +506,12 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
   std::optional<uint32_t> partitionBufferInUse_{std::nullopt};
 
   std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_;
+
+  // See InputEncodingBucket / inputEncodingCounts() above.
+  std::array<int64_t, kInputEncodingNum> inputEncodingCounts_{};
+
+  // See inputEncodingSkippedBatches() above.
+  int64_t inputEncodingSkippedBatches_{0};
 }; // class VeloxHashBasedShuffleWriter
 
 } // namespace gluten
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index 0cd9e38bc6..00c0c2df69 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -100,6 +100,9 @@ set(VELOX_TEST_COMMON_SRCS JsonToProtoConverter.cc 
FilePathGenerator.cc)
 
 add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc)
 
+add_velox_test(velox_hash_shuffle_writer_input_encoding_test SOURCES
+               VeloxHashShuffleWriterInputEncodingTest.cc)
+
 add_velox_test(velox_shuffle_writer_spill_test SOURCES
                VeloxShuffleWriterSpillTest.cc)
 
diff --git a/cpp/velox/tests/VeloxHashShuffleWriterInputEncodingTest.cc 
b/cpp/velox/tests/VeloxHashShuffleWriterInputEncodingTest.cc
new file mode 100644
index 0000000000..8501fc7ecf
--- /dev/null
+++ b/cpp/velox/tests/VeloxHashShuffleWriterInputEncodingTest.cc
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include "shuffle/VeloxHashShuffleWriter.h"
+
+#include "VeloxShuffleWriterTestBase.h"
+#include "utils/Macros.h"
+#include "utils/TestUtils.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
+
+namespace gluten {
+
+namespace {
+
+std::shared_ptr<PartitionWriter> makeLocalPartitionWriter(
+    uint32_t numPartitions,
+    const std::string& dataFile,
+    const std::vector<std::string>& localDirs) {
+  GLUTEN_ASSIGN_OR_THROW(auto codec, 
arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME));
+  auto options = std::make_shared<LocalPartitionWriterOptions>();
+  return std::make_shared<LocalPartitionWriter>(
+      numPartitions, std::move(codec), getDefaultMemoryManager(), options, 
dataFile, localDirs);
+}
+
+} // namespace
+
+class HashShuffleWriterInputEncodingTest : public ::testing::Test, public 
VeloxShuffleWriterTestBase {
+ protected:
+  static void SetUpTestSuite() {
+    setUpVeloxBackend();
+  }
+
+  static void TearDownTestSuite() {
+    tearDownVeloxBackend();
+  }
+
+  void SetUp() override {
+    VeloxShuffleWriterTestBase::setUpTestData();
+  }
+
+  std::shared_ptr<VeloxHashShuffleWriter> createWriter(uint32_t numPartitions) 
{
+    auto options = std::make_shared<HashShuffleWriterOptions>();
+    options->partitioning = Partitioning::kHash;
+    options->splitBufferSize = 4096;
+
+    auto partitionWriter = makeLocalPartitionWriter(numPartitions, dataFile_, 
localDirs_);
+
+    GLUTEN_ASSIGN_OR_THROW(
+        auto base,
+        VeloxShuffleWriter::create(
+            ShuffleWriterType::kHashShuffle, numPartitions, partitionWriter, 
options, getDefaultMemoryManager()));
+    return std::dynamic_pointer_cast<VeloxHashShuffleWriter>(base);
+  }
+
+  // Wrap a RowVector into a VeloxColumnarBatch and feed it to the writer
+  // (no flatten before the call — the encoding counts are captured on the
+  // batch as-is).
+  arrow::Status writeBatch(VeloxHashShuffleWriter& writer, 
facebook::velox::RowVectorPtr rv) {
+    std::shared_ptr<ColumnarBatch> cb = 
std::make_shared<VeloxColumnarBatch>(rv);
+    return writer.write(cb, ShuffleWriter::kMinMemLimit);
+  }
+};
+
+// All-flat input: every child increments the FLAT bucket, other buckets stay 
0.
+TEST_F(HashShuffleWriterInputEncodingTest, allFlat) {
+  auto writer = createWriter(2);
+  ASSERT_NE(writer, nullptr);
+
+  // Two batches with 3 flat children each (first is the partition-key column
+  // required by hash partitioning).
+  for (int i = 0; i < 2; ++i) {
+    auto rv = makeRowVector({
+        makeFlatVector<int32_t>({0, 1, 0, 1}),
+        makeFlatVector<int32_t>({10, 20, 30, 40}),
+        makeFlatVector<int64_t>({100, 200, 300, 400}),
+    });
+    ASSERT_NOT_OK(writeBatch(*writer, rv));
+  }
+
+  const auto& counts = writer->inputEncodingCounts();
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingFlat], 6);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingDictionary], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingConstant], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingLazy], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingComplex], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingOther], 0);
+  EXPECT_EQ(writer->inputEncodingSkippedBatches(), 0);
+}
+
+// Mixed flat + dictionary + constant in a single batch: one increment per 
bucket.
+TEST_F(HashShuffleWriterInputEncodingTest, mixedFlatDictConst) {
+  auto writer = createWriter(2);
+  ASSERT_NE(writer, nullptr);
+
+  // dict-encoded VARCHAR child
+  auto dictBase = makeFlatVector<facebook::velox::StringView>({"a", "b", "c", 
"d"});
+  auto indices = makeIndices({0, 1, 2, 3});
+  auto dictChild = facebook::velox::BaseVector::wrapInDictionary(nullptr, 
indices, 4, dictBase);
+
+  // constant int32 child
+  auto constChild = facebook::velox::BaseVector::createConstant(
+      facebook::velox::INTEGER(), facebook::velox::variant(int32_t{42}), 4, 
pool());
+
+  auto rv = makeRowVector({
+      makeFlatVector<int32_t>({0, 1, 0, 1}), // partition key (FLAT)
+      makeFlatVector<int32_t>({10, 20, 30, 40}), // FLAT data
+      dictChild, // DICTIONARY
+      constChild, // CONSTANT
+  });
+  ASSERT_NOT_OK(writeBatch(*writer, rv));
+
+  const auto& counts = writer->inputEncodingCounts();
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingFlat], 2);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingDictionary], 1);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingConstant], 1);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingLazy], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingComplex], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingOther], 0);
+  EXPECT_EQ(writer->inputEncodingSkippedBatches(), 0);
+}
+
+// Lazy child should land in the LAZY bucket (encoding is reported before
+// any loadedVector() call).
+TEST_F(HashShuffleWriterInputEncodingTest, lazy) {
+  auto writer = createWriter(2);
+  ASSERT_NE(writer, nullptr);
+
+  auto lazyChild = std::make_shared<facebook::velox::LazyVector>(
+      pool(),
+      facebook::velox::BIGINT(),
+      4,
+      
std::make_unique<facebook::velox::test::SimpleVectorLoader>([&](facebook::velox::RowSet
 /*rows*/) {
+        return makeFlatVector<int64_t>({100, 200, 300, 400});
+      }));
+
+  auto rv = makeRowVector({
+      makeFlatVector<int32_t>({0, 1, 0, 1}), // partition key
+      lazyChild,
+  });
+  ASSERT_NOT_OK(writeBatch(*writer, rv));
+
+  const auto& counts = writer->inputEncodingCounts();
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingFlat], 1);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingLazy], 1);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingDictionary], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingConstant], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingComplex], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingOther], 0);
+  EXPECT_EQ(writer->inputEncodingSkippedBatches(), 0);
+}
+
+// ARRAY / MAP / ROW children land in the COMPLEX bucket, not OTHER. The 
sibling
+// `VeloxShuffleWriterTest` exercises these via `makeArrayVector` / 
`makeMapVector`,
+// so this is the typical-Spark-workload case, not an edge case.
+TEST_F(HashShuffleWriterInputEncodingTest, complex) {
+  auto writer = createWriter(2);
+  ASSERT_NE(writer, nullptr);
+
+  auto arrayChild = makeArrayVector<int64_t>({{1, 2}, {3, 4}, {5}, {6, 7, 8}});
+  auto mapChild = makeMapVector<int32_t, facebook::velox::StringView>(
+      {{{1, "a"}, {2, "b"}}, {{3, "c"}}, {{4, "d"}, {5, "e"}}, {{6, "f"}}});
+
+  auto rv = makeRowVector({
+      makeFlatVector<int32_t>({0, 1, 0, 1}), // partition key (FLAT)
+      arrayChild, // ARRAY
+      mapChild, // MAP
+  });
+  ASSERT_NOT_OK(writeBatch(*writer, rv));
+
+  const auto& counts = writer->inputEncodingCounts();
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingFlat], 1);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingComplex], 2);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingDictionary], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingConstant], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingLazy], 0);
+  EXPECT_EQ(counts[VeloxHashShuffleWriter::kInputEncodingOther], 0);
+  EXPECT_EQ(writer->inputEncodingSkippedBatches(), 0);
+}
+
+} // namespace gluten
+
+int main(int argc, char** argv) {
+  testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to