This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3b715c1195e branch-4.0: [refactor](rle) Refactor RLE for writing long
runs of repeated values #59927 (#60178)
3b715c1195e is described below
commit 3b715c1195e6ac28458b9bc3eceaee76a7bd0d9b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 30 16:18:56 2026 +0800
branch-4.0: [refactor](rle) Refactor RLE for writing long runs of repeated
values #59927 (#60178)
Cherry-picked from #59927
Co-authored-by: Sun Chenyang <[email protected]>
---
be/src/util/rle_encoding.h | 45 +++--
be/test/util/rle_encoding_test.cpp | 357 +++++++++++++++++++++++++++++++++++++
2 files changed, 389 insertions(+), 13 deletions(-)
diff --git a/be/src/util/rle_encoding.h b/be/src/util/rle_encoding.h
index eef5aee56a3..1685ab85f1e 100644
--- a/be/src/util/rle_encoding.h
+++ b/be/src/util/rle_encoding.h
@@ -435,17 +435,35 @@ template <typename T>
void RleEncoder<T>::Put(T value, size_t run_length) {
DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
- // TODO(perf): remove the loop and use the repeat_count_
- for (; run_length > 0; run_length--) {
+ // Fast path: if this is a continuation of the current repeated run and
+ // we've already buffered enough values, just increment repeat_count_
+ if (current_value_ == value && repeat_count_ >= 8 && run_length > 0)
[[likely]] {
+ repeat_count_ += run_length;
+ return;
+ }
+
+ // Handle run_length > 1 more efficiently
+ while (run_length > 0) {
if (current_value_ == value) [[likely]] {
- ++repeat_count_;
- if (repeat_count_ > 8) {
- // This is just a continuation of the current run, no need to
buffer the
- // values.
- // Note that this is the fast path for long repeated runs.
- continue;
+ // Need to buffer values until we reach 8
+ size_t to_buffer = std::min(run_length, size_t(8 -
num_buffered_values_));
+ for (size_t i = 0; i < to_buffer; ++i) {
+ buffered_values_[num_buffered_values_++] = value;
+ ++repeat_count_;
+ }
+ run_length -= to_buffer;
+ if (num_buffered_values_ == 8) {
+ DCHECK_EQ(literal_count_ % 8, 0);
+ FlushBufferedValues(false);
+ // After flushing, if we still have a repeated run and more
values,
+ // we can add them directly to repeat_count_
+ if (repeat_count_ >= 8 && run_length > 0) {
+ repeat_count_ += run_length;
+ return;
+ }
}
} else {
+ // Value changed
if (repeat_count_ >= 8) {
// We had a run that was long enough but it has ended. Flush
the
// current repeated run.
@@ -454,12 +472,13 @@ void RleEncoder<T>::Put(T value, size_t run_length) {
}
repeat_count_ = 1;
current_value_ = value;
- }
- buffered_values_[num_buffered_values_] = value;
- if (++num_buffered_values_ == 8) {
- DCHECK_EQ(literal_count_ % 8, 0);
- FlushBufferedValues(false);
+ buffered_values_[num_buffered_values_++] = value;
+ --run_length;
+ if (num_buffered_values_ == 8) {
+ DCHECK_EQ(literal_count_ % 8, 0);
+ FlushBufferedValues(false);
+ }
}
}
}
diff --git a/be/test/util/rle_encoding_test.cpp
b/be/test/util/rle_encoding_test.cpp
index 8f147b64fd1..5af34168869 100644
--- a/be/test/util/rle_encoding_test.cpp
+++ b/be/test/util/rle_encoding_test.cpp
@@ -25,6 +25,7 @@
#include <algorithm>
#include <boost/utility/binary.hpp>
+#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <cstring>
@@ -418,4 +419,360 @@ TEST_F(TestRle, TestSkip) {
encoder.Flush();
}
+// Helper to compare Put with run_length vs multiple Put(value) calls
+template <typename T>
+void ValidatePutRunLength(const std::vector<std::pair<T, size_t>>& runs, int
bit_width) {
+ // Encode using Put(value, run_length)
+ faststring buffer1;
+ RleEncoder<T> encoder1(&buffer1, bit_width);
+ for (const auto& [value, length] : runs) {
+ encoder1.Put(value, length);
+ }
+ encoder1.Flush();
+
+ // Encode using multiple Put(value) calls
+ faststring buffer2;
+ RleEncoder<T> encoder2(&buffer2, bit_width);
+ for (const auto& [value, length] : runs) {
+ for (size_t i = 0; i < length; ++i) {
+ encoder2.Put(value);
+ }
+ }
+ encoder2.Flush();
+
+ // Both should produce identical output
+ EXPECT_EQ(buffer1.size(), buffer2.size());
+ EXPECT_EQ(memcmp(buffer1.data(), buffer2.data(), buffer1.size()), 0)
+ << "Put with run_length produced different output than individual
Puts";
+
+ // Verify decoding produces correct values
+ RleDecoder<T> decoder(buffer1.data(), buffer1.size(), bit_width);
+ for (const auto& [value, length] : runs) {
+ for (size_t i = 0; i < length; ++i) {
+ T decoded_val;
+ EXPECT_TRUE(decoder.Get(&decoded_val));
+ EXPECT_EQ(value, decoded_val);
+ }
+ }
+}
+
+// Test Put(value, run_length) with various patterns
+TEST_F(TestRle, TestPutWithRunLength) {
+ // Test 1: Single long run (should use fast path after 8 values)
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs = {{42, 100}};
+ ValidatePutRunLength(runs, 8);
+ }
+
+ // Test 2: Run length exactly 8 (boundary case)
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs = {{1, 8}, {2, 8}, {3,
8}};
+ ValidatePutRunLength(runs, 8);
+ }
+
+ // Test 3: Run length less than 8
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs = {{1, 3}, {2, 5}, {3,
7}};
+ ValidatePutRunLength(runs, 8);
+ }
+
+ // Test 4: Run length crossing buffer boundary (e.g., 5 + 10 with same
value)
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs = {{1, 5}, {1, 10}};
+ ValidatePutRunLength(runs, 8);
+ }
+
+ // Test 5: Alternating values with various run lengths
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs = {{0, 7}, {1, 9}, {0,
15}, {1, 3}, {0, 20}};
+ ValidatePutRunLength(runs, 8);
+ }
+
+ // Test 6: Very long runs (test fast path)
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs = {{0, 1000}, {1, 1000},
{0, 1000}};
+ ValidatePutRunLength(runs, 8);
+ }
+
+ // Test 7: Run length of 1 (no batching benefit)
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs;
+ for (int i = 0; i < 100; ++i) {
+ runs.push_back({static_cast<uint64_t>(i % 10), 1});
+ }
+ ValidatePutRunLength(runs, 8);
+ }
+
+ // Test 8: Boolean values
+ {
+ std::vector<std::pair<bool, size_t>> runs = {
+ {true, 10}, {false, 7}, {true, 5}, {true, 15}, {false, 20}};
+ ValidatePutRunLength(runs, 1);
+ }
+
+ // Test 9: Large bit width (64-bit)
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs = {
+ {0xFFFFFFFFFFFFFFFFULL, 50}, {0x123456789ABCDEF0ULL, 30}, {0,
100}};
+ ValidatePutRunLength(runs, 64);
+ }
+
+ // Test 10: Mixed short and long runs
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs = {{1, 2}, {2, 100},
{3, 1},
+ {4, 50}, {5, 3},
{6, 200}};
+ ValidatePutRunLength(runs, 8);
+ }
+}
+
+// Test Put(value, run_length) specifically for the buffering logic
+TEST_F(TestRle, TestPutRunLengthBuffering) {
+ // Test case: value changes while buffer is partially filled
+ // This tests the "value changed" branch in the while loop
+ {
+ faststring buffer;
+ RleEncoder<uint64_t> encoder(&buffer, 8);
+
+ // Put 3 values of 'A', then 5 values of 'B' (total 8, triggers flush)
+ encoder.Put(10, 3);
+ encoder.Put(20, 5);
+ encoder.Flush();
+
+ // Decode and verify
+ RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+ uint64_t val;
+ for (int i = 0; i < 3; ++i) {
+ EXPECT_TRUE(decoder.Get(&val));
+ EXPECT_EQ(10, val);
+ }
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_TRUE(decoder.Get(&val));
+ EXPECT_EQ(20, val);
+ }
+ }
+
+ // Test case: run_length spans multiple buffer fills
+ {
+ faststring buffer;
+ RleEncoder<uint64_t> encoder(&buffer, 8);
+
+ // Put 25 values (spans 3 buffer fills: 8 + 8 + 8 + 1)
+ encoder.Put(42, 25);
+ encoder.Flush();
+
+ RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+ uint64_t val;
+ for (int i = 0; i < 25; ++i) {
+ EXPECT_TRUE(decoder.Get(&val));
+ EXPECT_EQ(42, val);
+ }
+ }
+
+ // Test case: existing repeated run (repeat_count >= 8) then value changes
+ {
+ faststring buffer;
+ RleEncoder<uint64_t> encoder(&buffer, 8);
+
+ // Build up a repeated run
+ encoder.Put(1, 20);
+ // Now change value - should flush the repeated run
+ encoder.Put(2, 5);
+ encoder.Flush();
+
+ RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+ uint64_t val;
+ for (int i = 0; i < 20; ++i) {
+ EXPECT_TRUE(decoder.Get(&val));
+ EXPECT_EQ(1, val);
+ }
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_TRUE(decoder.Get(&val));
+ EXPECT_EQ(2, val);
+ }
+ }
+}
+
+// Test Put(value, run_length=0) edge case
+TEST_F(TestRle, TestPutRunLengthZero) {
+ faststring buffer;
+ RleEncoder<uint64_t> encoder(&buffer, 8);
+
+ encoder.Put(1, 10);
+ encoder.Put(2, 0); // Should be a no-op
+ encoder.Put(1, 5); // Should continue the run of 1s
+ encoder.Flush();
+
+ RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+ uint64_t val;
+ for (int i = 0; i < 15; ++i) {
+ EXPECT_TRUE(decoder.Get(&val));
+ EXPECT_EQ(1, val);
+ }
+ EXPECT_FALSE(decoder.Get(&val)); // No more values
+}
+
+// Test to cover the branch at L460-463 in Put():
+// After FlushBufferedValues, if repeat_count_ >= 8 and run_length > 0,
+// add remaining run_length to repeat_count_ directly.
+//
+// To trigger this branch:
+// 1. Start fresh (repeat_count_=0, num_buffered_values_=0)
+// 2. Put same value with run_length > 8 (e.g., 20)
+// 3. First 8 values fill buffer, trigger FlushBufferedValues
+// 4. FlushBufferedValues sees all 8 same values -> repeat_count_ stays at 8
+// 5. After flush: repeat_count_=8, run_length=12 -> branch triggered!
+TEST_F(TestRle, TestPutFlushThenFastPath) {
+ // Test case 1: Single Put with run_length > 8, triggers the L460 branch
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs = {{42, 20}};
+ ValidatePutRunLength(runs, 8);
+ }
+
+ // Test case 2: run_length exactly causes flush then has remaining
+ // Put 16 values: fills buffer twice
+ // First 8: flush, repeat_count_=8, run_length=8 -> L460 branch!
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs = {{99, 16}};
+ ValidatePutRunLength(runs, 8);
+ }
+
+ // Test case 3: run_length = 9 (just over buffer size)
+ // First 8: flush, repeat_count_=8, run_length=1 -> L460 branch!
+ {
+ std::vector<std::pair<uint64_t, size_t>> runs = {{77, 9}};
+ ValidatePutRunLength(runs, 8);
+ }
+
+ // Test case 4: Verify the fast path after entering repeated run
+ // Put(1, 8) -> flush -> repeat_count_=8
+ // Put(1, 100) -> should hit fast path at L440
+ {
+ faststring buffer;
+ RleEncoder<uint64_t> encoder(&buffer, 8);
+
+ encoder.Put(1, 8); // Fill buffer and flush, repeat_count_ = 8
+ encoder.Put(1, 100); // Should hit fast path (L440)
+ encoder.Flush();
+
+ RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+ uint64_t val;
+ for (int i = 0; i < 108; ++i) {
+ EXPECT_TRUE(decoder.Get(&val));
+ EXPECT_EQ(1, val);
+ }
+ EXPECT_FALSE(decoder.Get(&val));
+ }
+
+ // Test case 5: Ensure L460 branch works with literal prefix
+ // Put different values first to create literal, then same values
+ // This ensures FlushBufferedValues might create literal, not repeat
+ {
+ faststring buffer;
+ RleEncoder<uint64_t> encoder(&buffer, 8);
+
+ // Create a literal run first (different values)
+ encoder.Put(1, 1);
+ encoder.Put(2, 1);
+ encoder.Put(3, 1);
+ encoder.Put(4, 1);
+ encoder.Put(5, 1);
+ encoder.Put(6, 1);
+ encoder.Put(7, 1);
+ encoder.Put(8, 1); // Buffer full, flush as literal
+
+ // Now put same value many times
+ encoder.Put(99, 20); // Should eventually hit L460 branch
+ encoder.Flush();
+
+ RleDecoder<uint64_t> decoder(buffer.data(), buffer.size(), 8);
+ uint64_t val;
+ for (int i = 1; i <= 8; ++i) {
+ EXPECT_TRUE(decoder.Get(&val));
+ EXPECT_EQ(i, val);
+ }
+ for (int i = 0; i < 20; ++i) {
+ EXPECT_TRUE(decoder.Get(&val));
+ EXPECT_EQ(99, val);
+ }
+ EXPECT_FALSE(decoder.Get(&val));
+ }
+}
+
+// Benchmark for Put function performance with consecutive equal values
+// This test compares:
+// 1. Put(value) called N times in a loop
+// 2. Put(value, N) called once with run_length=N
+TEST_F(TestRle, BenchmarkPutConsecutiveValues) {
+ const int kNumIterations = 10000000; // Number of benchmark iterations
+ const size_t kNumValues = 1000; // 1M values per iteration
+ const int kBitWidth = 8;
+
+ // Warm up
+ {
+ faststring buffer;
+ RleEncoder<uint64_t> encoder(&buffer, kBitWidth);
+ for (size_t i = 0; i < 10000; ++i) {
+ encoder.Put(42);
+ }
+ encoder.Flush();
+ }
+
+ // Benchmark 2: Put(value, run_length) - tests optimized batch path
+ int64_t total_time_batch_ns = 0;
+ for (int iter = 0; iter < kNumIterations; ++iter) {
+ faststring buffer;
+ RleEncoder<uint64_t> encoder(&buffer, kBitWidth);
+
+ auto start = std::chrono::high_resolution_clock::now();
+ encoder.Put(42, kNumValues);
+ encoder.Flush();
+ auto end = std::chrono::high_resolution_clock::now();
+
+ total_time_batch_ns +=
+ std::chrono::duration_cast<std::chrono::nanoseconds>(end -
start).count();
+ }
+
+ double avg_batch_ms = total_time_batch_ns / 1e6;
+
+ std::cout << "=== RLE Put Benchmark (consecutive equal values) ===" <<
std::endl;
+ std::cout << "Values per iteration: " << kNumValues << std::endl;
+ std::cout << "Number of iterations: " << kNumIterations << std::endl;
+ std::cout << "Put(value, N): " << avg_batch_ms << " ms avg" <<
std::endl;
+}
+
+// Benchmark for mixed patterns: alternating runs of different lengths
+TEST_F(TestRle, BenchmarkPutMixedPattern) {
+ const int kNumIterations = 1000000;
+ const int kBitWidth = 8;
+
+ // Pattern: alternating values with varying run lengths
+ // Simulates real-world data like null bitmaps or repeated values in
columns
+ std::vector<std::pair<uint64_t, size_t>> pattern = {{0, 100}, {1, 50},
{0, 200},
+ {1, 10}, {0, 500},
{1, 1000},
+ {0, 5000}, {1, 100},
{0, 10000}};
+
+ // Benchmark batch puts
+ int64_t total_time_batch_ns = 0;
+ for (int iter = 0; iter < kNumIterations; ++iter) {
+ faststring buffer;
+ RleEncoder<uint64_t> encoder(&buffer, kBitWidth);
+
+ auto start = std::chrono::high_resolution_clock::now();
+ for (const auto& [value, length] : pattern) {
+ encoder.Put(value, length);
+ }
+ encoder.Flush();
+ auto end = std::chrono::high_resolution_clock::now();
+
+ total_time_batch_ns +=
+ std::chrono::duration_cast<std::chrono::nanoseconds>(end -
start).count();
+ }
+
+ double avg_batch_ms = total_time_batch_ns / 1e6;
+
+ std::cout << "=== RLE Put Benchmark (mixed pattern) ===" << std::endl;
+ std::cout << "Number of iterations: " << kNumIterations << std::endl;
+ std::cout << "Put(value, N): " << avg_batch_ms << " ms avg" <<
std::endl;
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]