This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3b715c1195e branch-4.0: [refactor](rle) Refactor RLE for writing long 
runs of repeated values #59927 (#60178)
3b715c1195e is described below

commit 3b715c1195e6ac28458b9bc3eceaee76a7bd0d9b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 30 16:18:56 2026 +0800

    branch-4.0: [refactor](rle) Refactor RLE for writing long runs of repeated 
values #59927 (#60178)
    
    Cherry-picked from #59927
    
    Co-authored-by: Sun Chenyang <[email protected]>
---
 be/src/util/rle_encoding.h         |  45 +++--
 be/test/util/rle_encoding_test.cpp | 357 +++++++++++++++++++++++++++++++++++++
 2 files changed, 389 insertions(+), 13 deletions(-)

diff --git a/be/src/util/rle_encoding.h b/be/src/util/rle_encoding.h
index eef5aee56a3..1685ab85f1e 100644
--- a/be/src/util/rle_encoding.h
+++ b/be/src/util/rle_encoding.h
@@ -435,17 +435,35 @@ template <typename T>
 void RleEncoder<T>::Put(T value, size_t run_length) {
     DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
 
-    // TODO(perf): remove the loop and use the repeat_count_
-    for (; run_length > 0; run_length--) {
+    // Fast path: if this is a continuation of the current repeated run and
+    // we've already buffered enough values, just increment repeat_count_
+    if (current_value_ == value && repeat_count_ >= 8 && run_length > 0) 
[[likely]] {
+        repeat_count_ += run_length;
+        return;
+    }
+
+    // Handle run_length > 1 more efficiently
+    while (run_length > 0) {
         if (current_value_ == value) [[likely]] {
-            ++repeat_count_;
-            if (repeat_count_ > 8) {
-                // This is just a continuation of the current run, no need to 
buffer the
-                // values.
-                // Note that this is the fast path for long repeated runs.
-                continue;
+            // Need to buffer values until we reach 8
+            size_t to_buffer = std::min(run_length, size_t(8 - 
num_buffered_values_));
+            for (size_t i = 0; i < to_buffer; ++i) {
+                buffered_values_[num_buffered_values_++] = value;
+                ++repeat_count_;
+            }
+            run_length -= to_buffer;
+            if (num_buffered_values_ == 8) {
+                DCHECK_EQ(literal_count_ % 8, 0);
+                FlushBufferedValues(false);
+                // After flushing, if we still have a repeated run and more 
values,
+                // we can add them directly to repeat_count_
+                if (repeat_count_ >= 8 && run_length > 0) {
+                    repeat_count_ += run_length;
+                    return;
+                }
             }
         } else {
+            // Value changed
             if (repeat_count_ >= 8) {
                 // We had a run that was long enough but it has ended.  Flush 
the
                 // current repeated run.
@@ -454,12 +472,13 @@ void RleEncoder<T>::Put(T value, size_t run_length) {
             }
             repeat_count_ = 1;
             current_value_ = value;
-        }
 
-        buffered_values_[num_buffered_values_] = value;
-        if (++num_buffered_values_ == 8) {
-            DCHECK_EQ(literal_count_ % 8, 0);
-            FlushBufferedValues(false);
+            buffered_values_[num_buffered_values_++] = value;
+            --run_length;
+            if (num_buffered_values_ == 8) {
+                DCHECK_EQ(literal_count_ % 8, 0);
+                FlushBufferedValues(false);
+            }
         }
     }
 }
diff --git a/be/test/util/rle_encoding_test.cpp 
b/be/test/util/rle_encoding_test.cpp
index 8f147b64fd1..5af34168869 100644
--- a/be/test/util/rle_encoding_test.cpp
+++ b/be/test/util/rle_encoding_test.cpp
@@ -25,6 +25,7 @@
 
 #include <algorithm>
 #include <boost/utility/binary.hpp>
+#include <chrono>
 #include <cstdint>
 #include <cstdlib>
 #include <cstring>
@@ -418,4 +419,360 @@ TEST_F(TestRle, TestSkip) {
     encoder.Flush();
 }
 
+// Helper to compare Put with run_length vs multiple Put(value) calls
+template <typename T>
+void ValidatePutRunLength(const std::vector<std::pair<T, size_t>>& runs, int 
bit_width) {
+    // Encode using Put(value, run_length)
+    faststring buffer1;
+    RleEncoder<T> encoder1(&buffer1, bit_width);
+    for (const auto& [value, length] : runs) {
+        encoder1.Put(value, length);
+    }
+    encoder1.Flush();
+
+    // Encode using multiple Put(value) calls
+    faststring buffer2;
+    RleEncoder<T> encoder2(&buffer2, bit_width);
+    for (const auto& [value, length] : runs) {
+        for (size_t i = 0; i < length; ++i) {
+            encoder2.Put(value);
+        }
+    }
+    encoder2.Flush();
+
+    // Both should produce identical output
+    EXPECT_EQ(buffer1.size(), buffer2.size());
+    EXPECT_EQ(memcmp(buffer1.data(), buffer2.data(), buffer1.size()), 0)
+            << "Put with run_length produced different output than individual 
Puts";
+
+    // Verify decoding produces correct values
+    RleDecoder<T> decoder(buffer1.data(), buffer1.size(), bit_width);
+    for (const auto& [value, length] : runs) {
+        for (size_t i = 0; i < length; ++i) {
+            T decoded_val;
+            EXPECT_TRUE(decoder.Get(&decoded_val));
+            EXPECT_EQ(value, decoded_val);
+        }
+    }
+}
+
+// Test Put(value, run_length) with various patterns
+TEST_F(TestRle, TestPutWithRunLength) {
+    // Test 1: Single long run (should use fast path after 8 values)
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs = {{42, 100}};
+        ValidatePutRunLength(runs, 8);
+    }
+
+    // Test 2: Run length exactly 8 (boundary case)
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs = {{1, 8}, {2, 8}, {3, 
8}};
+        ValidatePutRunLength(runs, 8);
+    }
+
+    // Test 3: Run length less than 8
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs = {{1, 3}, {2, 5}, {3, 
7}};
+        ValidatePutRunLength(runs, 8);
+    }
+
+    // Test 4: Run length crossing buffer boundary (e.g., 5 + 10 with same 
value)
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs = {{1, 5}, {1, 10}};
+        ValidatePutRunLength(runs, 8);
+    }
+
+    // Test 5: Alternating values with various run lengths
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs = {{0, 7}, {1, 9}, {0, 
15}, {1, 3}, {0, 20}};
+        ValidatePutRunLength(runs, 8);
+    }
+
+    // Test 6: Very long runs (test fast path)
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs = {{0, 1000}, {1, 1000}, 
{0, 1000}};
+        ValidatePutRunLength(runs, 8);
+    }
+
+    // Test 7: Run length of 1 (no batching benefit)
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs;
+        for (int i = 0; i < 100; ++i) {
+            runs.push_back({static_cast<uint64_t>(i % 10), 1});
+        }
+        ValidatePutRunLength(runs, 8);
+    }
+
+    // Test 8: Boolean values
+    {
+        std::vector<std::pair<bool, size_t>> runs = {
+                {true, 10}, {false, 7}, {true, 5}, {true, 15}, {false, 20}};
+        ValidatePutRunLength(runs, 1);
+    }
+
+    // Test 9: Large bit width (64-bit)
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs = {
+                {0xFFFFFFFFFFFFFFFFULL, 50}, {0x123456789ABCDEF0ULL, 30}, {0, 
100}};
+        ValidatePutRunLength(runs, 64);
+    }
+
+    // Test 10: Mixed short and long runs
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs = {{1, 2},  {2, 100}, 
{3, 1},
+                                                         {4, 50}, {5, 3},   
{6, 200}};
+        ValidatePutRunLength(runs, 8);
+    }
+}
+
+// Test Put(value, run_length) specifically for the buffering logic
+TEST_F(TestRle, TestPutRunLengthBuffering) {
+    // Test case: value changes while buffer is partially filled
+    // This tests the "value changed" branch in the while loop
+    {
+        faststring buffer;
+        RleEncoder<uint64_t> encoder(&buffer, 8);
+
+        // Put 3 values of 'A', then 5 values of 'B' (total 8, triggers flush)
+        encoder.Put(10, 3);
+        encoder.Put(20, 5);
+        encoder.Flush();
+
+        // Decode and verify
+        RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+        uint64_t val;
+        for (int i = 0; i < 3; ++i) {
+            EXPECT_TRUE(decoder.Get(&val));
+            EXPECT_EQ(10, val);
+        }
+        for (int i = 0; i < 5; ++i) {
+            EXPECT_TRUE(decoder.Get(&val));
+            EXPECT_EQ(20, val);
+        }
+    }
+
+    // Test case: run_length spans multiple buffer fills
+    {
+        faststring buffer;
+        RleEncoder<uint64_t> encoder(&buffer, 8);
+
+        // Put 25 values (spans 3 buffer fills: 8 + 8 + 8 + 1)
+        encoder.Put(42, 25);
+        encoder.Flush();
+
+        RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+        uint64_t val;
+        for (int i = 0; i < 25; ++i) {
+            EXPECT_TRUE(decoder.Get(&val));
+            EXPECT_EQ(42, val);
+        }
+    }
+
+    // Test case: existing repeated run (repeat_count >= 8) then value changes
+    {
+        faststring buffer;
+        RleEncoder<uint64_t> encoder(&buffer, 8);
+
+        // Build up a repeated run
+        encoder.Put(1, 20);
+        // Now change value - should flush the repeated run
+        encoder.Put(2, 5);
+        encoder.Flush();
+
+        RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+        uint64_t val;
+        for (int i = 0; i < 20; ++i) {
+            EXPECT_TRUE(decoder.Get(&val));
+            EXPECT_EQ(1, val);
+        }
+        for (int i = 0; i < 5; ++i) {
+            EXPECT_TRUE(decoder.Get(&val));
+            EXPECT_EQ(2, val);
+        }
+    }
+}
+
+// Test Put(value, run_length=0) edge case
+TEST_F(TestRle, TestPutRunLengthZero) {
+    faststring buffer;
+    RleEncoder<uint64_t> encoder(&buffer, 8);
+
+    encoder.Put(1, 10);
+    encoder.Put(2, 0); // Should be a no-op
+    encoder.Put(1, 5); // Should continue the run of 1s
+    encoder.Flush();
+
+    RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+    uint64_t val;
+    for (int i = 0; i < 15; ++i) {
+        EXPECT_TRUE(decoder.Get(&val));
+        EXPECT_EQ(1, val);
+    }
+    EXPECT_FALSE(decoder.Get(&val)); // No more values
+}
+
+// Test to cover the branch at L460-463 in Put():
+// After FlushBufferedValues, if repeat_count_ >= 8 and run_length > 0,
+// add remaining run_length to repeat_count_ directly.
+//
+// To trigger this branch:
+// 1. Start fresh (repeat_count_=0, num_buffered_values_=0)
+// 2. Put same value with run_length > 8 (e.g., 20)
+// 3. First 8 values fill buffer, trigger FlushBufferedValues
+// 4. FlushBufferedValues sees all 8 same values -> repeat_count_ stays at 8
+// 5. After flush: repeat_count_=8, run_length=12 -> branch triggered!
+TEST_F(TestRle, TestPutFlushThenFastPath) {
+    // Test case 1: Single Put with run_length > 8, triggers the L460 branch
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs = {{42, 20}};
+        ValidatePutRunLength(runs, 8);
+    }
+
+    // Test case 2: run_length exactly causes flush then has remaining
+    // Put 16 values: fills buffer twice
+    // First 8: flush, repeat_count_=8, run_length=8 -> L460 branch!
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs = {{99, 16}};
+        ValidatePutRunLength(runs, 8);
+    }
+
+    // Test case 3: run_length = 9 (just over buffer size)
+    // First 8: flush, repeat_count_=8, run_length=1 -> L460 branch!
+    {
+        std::vector<std::pair<uint64_t, size_t>> runs = {{77, 9}};
+        ValidatePutRunLength(runs, 8);
+    }
+
+    // Test case 4: Verify the fast path after entering repeated run
+    // Put(1, 8) -> flush -> repeat_count_=8
+    // Put(1, 100) -> should hit fast path at L440
+    {
+        faststring buffer;
+        RleEncoder<uint64_t> encoder(&buffer, 8);
+
+        encoder.Put(1, 8);   // Fill buffer and flush, repeat_count_ = 8
+        encoder.Put(1, 100); // Should hit fast path (L440)
+        encoder.Flush();
+
+        RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+        uint64_t val;
+        for (int i = 0; i < 108; ++i) {
+            EXPECT_TRUE(decoder.Get(&val));
+            EXPECT_EQ(1, val);
+        }
+        EXPECT_FALSE(decoder.Get(&val));
+    }
+
+    // Test case 5: Ensure L460 branch works with literal prefix
+    // Put different values first to create literal, then same values
+    // This ensures FlushBufferedValues might create literal, not repeat
+    {
+        faststring buffer;
+        RleEncoder<uint64_t> encoder(&buffer, 8);
+
+        // Create a literal run first (different values)
+        encoder.Put(1, 1);
+        encoder.Put(2, 1);
+        encoder.Put(3, 1);
+        encoder.Put(4, 1);
+        encoder.Put(5, 1);
+        encoder.Put(6, 1);
+        encoder.Put(7, 1);
+        encoder.Put(8, 1); // Buffer full, flush as literal
+
+        // Now put same value many times
+        encoder.Put(99, 20); // Should eventually hit L460 branch
+        encoder.Flush();
+
+        RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+        uint64_t val;
+        for (int i = 1; i <= 8; ++i) {
+            EXPECT_TRUE(decoder.Get(&val));
+            EXPECT_EQ(i, val);
+        }
+        for (int i = 0; i < 20; ++i) {
+            EXPECT_TRUE(decoder.Get(&val));
+            EXPECT_EQ(99, val);
+        }
+        EXPECT_FALSE(decoder.Get(&val));
+    }
+}
+
+// Benchmark for Put function performance with consecutive equal values
+// This test compares:
+// 1. Put(value) called N times in a loop
+// 2. Put(value, N) called once with run_length=N
+TEST_F(TestRle, BenchmarkPutConsecutiveValues) {
+    const int kNumIterations = 10000000; // Number of benchmark iterations
+    const size_t kNumValues = 1000;      // 1M values per iteration
+    const int kBitWidth = 8;
+
+    // Warm up
+    {
+        faststring buffer;
+        RleEncoder<uint64_t> encoder(&buffer, kBitWidth);
+        for (size_t i = 0; i < 10000; ++i) {
+            encoder.Put(42);
+        }
+        encoder.Flush();
+    }
+
+    // Benchmark 2: Put(value, run_length) - tests optimized batch path
+    int64_t total_time_batch_ns = 0;
+    for (int iter = 0; iter < kNumIterations; ++iter) {
+        faststring buffer;
+        RleEncoder<uint64_t> encoder(&buffer, kBitWidth);
+
+        auto start = std::chrono::high_resolution_clock::now();
+        encoder.Put(42, kNumValues);
+        encoder.Flush();
+        auto end = std::chrono::high_resolution_clock::now();
+
+        total_time_batch_ns +=
+                std::chrono::duration_cast<std::chrono::nanoseconds>(end - 
start).count();
+    }
+
+    double avg_batch_ms = total_time_batch_ns / 1e6;
+
+    std::cout << "=== RLE Put Benchmark (consecutive equal values) ===" << 
std::endl;
+    std::cout << "Values per iteration: " << kNumValues << std::endl;
+    std::cout << "Number of iterations: " << kNumIterations << std::endl;
+    std::cout << "Put(value, N):        " << avg_batch_ms << " ms avg" << 
std::endl;
+}
+
+// Benchmark for mixed patterns: alternating runs of different lengths
+TEST_F(TestRle, BenchmarkPutMixedPattern) {
+    const int kNumIterations = 1000000;
+    const int kBitWidth = 8;
+
+    // Pattern: alternating values with varying run lengths
+    // Simulates real-world data like null bitmaps or repeated values in 
columns
+    std::vector<std::pair<uint64_t, size_t>> pattern = {{0, 100},  {1, 50},  
{0, 200},
+                                                        {1, 10},   {0, 500}, 
{1, 1000},
+                                                        {0, 5000}, {1, 100}, 
{0, 10000}};
+
+    // Benchmark batch puts
+    int64_t total_time_batch_ns = 0;
+    for (int iter = 0; iter < kNumIterations; ++iter) {
+        faststring buffer;
+        RleEncoder<uint64_t> encoder(&buffer, kBitWidth);
+
+        auto start = std::chrono::high_resolution_clock::now();
+        for (const auto& [value, length] : pattern) {
+            encoder.Put(value, length);
+        }
+        encoder.Flush();
+        auto end = std::chrono::high_resolution_clock::now();
+
+        total_time_batch_ns +=
+                std::chrono::duration_cast<std::chrono::nanoseconds>(end - 
start).count();
+    }
+
+    double avg_batch_ms = total_time_batch_ns / 1e6;
+
+    std::cout << "=== RLE Put Benchmark (mixed pattern) ===" << std::endl;
+    std::cout << "Number of iterations: " << kNumIterations << std::endl;
+    std::cout << "Put(value, N):        " << avg_batch_ms << " ms avg" << 
std::endl;
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to