This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 724a333acb1 [feat](be) Add PLAIN_ENCODING_V3 binary plain page for the 
V3 storage   format (#63570)
724a333acb1 is described below

commit 724a333acb18ffcd748616b1b0ae1b3cb50e6a30
Author: Chenyang Sun <[email protected]>
AuthorDate: Thu Jun 4 10:40:09 2026 +0800

    [feat](be) Add PLAIN_ENCODING_V3 binary plain page for the V3 storage   
format (#63570)
    
    V3 layout:
    
|data1..dataN|varuint_len1..varuint_lenN|data_block_size(u32)|num_elems(u32)|
    
    Benchmark (median of 10 reps): page pre-decode is ~1.0–3.6x faster than
    V2 (largest for short values), and the contiguous layout compresses
    ~1–11% smaller after ZSTD.
---
 be/benchmark/benchmark_binary_plain_page_v2.hpp    | 368 ++++++++++++++
 be/benchmark/benchmark_main.cpp                    |  31 +-
 be/src/storage/segment/binary_dict_page.cpp        |  15 +-
 be/src/storage/segment/binary_dict_page.h          |   5 +-
 .../storage/segment/binary_dict_page_pre_decoder.h |  11 +-
 be/src/storage/segment/binary_plain_page_v3.h      | 175 +++++++
 .../segment/binary_plain_page_v3_pre_decoder.h     | 190 +++++++
 be/src/storage/segment/column_writer.cpp           |   6 +-
 be/src/storage/segment/encoding_info.cpp           |  55 +-
 be/src/storage/segment/options.h                   |   9 +-
 be/test/storage/segment/binary_dict_page_test.cpp  |  97 +++-
 .../storage/segment/binary_plain_page_v3_test.cpp  | 566 +++++++++++++++++++++
 .../storage/segment/column_meta_accessor_test.cpp  |   2 +-
 be/test/storage/segment/encoding_info_test.cpp     |  43 +-
 gensrc/proto/olap_file.proto                       |   1 +
 gensrc/proto/segment_v2.proto                      |   1 +
 16 files changed, 1525 insertions(+), 50 deletions(-)

diff --git a/be/benchmark/benchmark_binary_plain_page_v2.hpp 
b/be/benchmark/benchmark_binary_plain_page_v2.hpp
new file mode 100644
index 00000000000..a15fc043eb9
--- /dev/null
+++ b/be/benchmark/benchmark_binary_plain_page_v2.hpp
@@ -0,0 +1,368 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <benchmark/benchmark.h>
+#include <gen_cpp/segment_v2.pb.h>
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <random>
+#include <string>
+#include <vector>
+
+#include "storage/cache/page_cache.h"
+#include "storage/segment/binary_plain_page_v2.h"
+#include "storage/segment/binary_plain_page_v2_pre_decoder.h"
+#include "storage/segment/binary_plain_page_v3.h"
+#include "storage/segment/binary_plain_page_v3_pre_decoder.h"
+#include "storage/segment/options.h"
+#include "storage/segment/page_builder.h"
+#include "storage/types.h"
+#include "util/block_compression.h"
+#include "util/faststring.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace segment_v2 {
+
+// Build a fixed corpus of strings of `value_len` bytes each. We reuse the same
+// corpus across V2 and V3 so the only variable left is the on-disk layout. A
+// deterministic RNG keeps results comparable across runs.
+inline std::vector<std::string> make_corpus(size_t num_elems, size_t 
value_len) {
+    std::mt19937 rng(0xC0FFEEu);
+    std::uniform_int_distribution<int> dist('a', 'z');
+    std::vector<std::string> corpus;
+    corpus.reserve(num_elems);
+    for (size_t i = 0; i < num_elems; ++i) {
+        std::string s(value_len, '\0');
+        for (size_t j = 0; j < value_len; ++j) {
+            s[j] = static_cast<char>(dist(rng));
+        }
+        corpus.emplace_back(std::move(s));
+    }
+    return corpus;
+}
+
+template <template <FieldType> class BuilderT>
+inline OwnedSlice build_page(const std::vector<std::string>& corpus) {
+    std::vector<Slice> slices;
+    slices.reserve(corpus.size());
+    for (const auto& s : corpus) {
+        slices.emplace_back(s);
+    }
+
+    PageBuilderOptions opts;
+    // Disable the size-bound check so the whole corpus lands in one page.
+    opts.data_page_size = 0;
+    opts.dict_page_size = 0;
+
+    PageBuilder* raw = nullptr;
+    Status st = BuilderT<FieldType::OLAP_FIELD_TYPE_VARCHAR>::create(&raw, 
opts);
+    CHECK(st.ok()) << st;
+    std::unique_ptr<PageBuilder> builder(raw);
+
+    size_t count = slices.size();
+    st = builder->add(reinterpret_cast<const uint8_t*>(slices.data()), &count);
+    CHECK(st.ok()) << st;
+    CHECK_EQ(count, slices.size());
+
+    OwnedSlice out;
+    st = builder->finish(&out);
+    CHECK(st.ok()) << st;
+    return out;
+}
+
+// Per-fixture: build the input page once outside the timed loop, then in each
+// iteration restore the input Slice (since decode() rewrites it to point at 
the
+// freshly-allocated V1 page) and measure only the decode call. We also report
+// per-element throughput so V2 vs V3 are easy to compare across (N, len).
+template <template <FieldType> class BuilderT, class PreDecoderT>
+inline void run_pre_decode_bm(benchmark::State& state) {
+    const size_t num_elems = static_cast<size_t>(state.range(0));
+    const size_t value_len = static_cast<size_t>(state.range(1));
+
+    auto corpus = make_corpus(num_elems, value_len);
+    OwnedSlice owned = build_page<BuilderT>(corpus);
+    const Slice original = owned.slice();
+
+    PreDecoderT pre_decoder;
+
+    for (auto _ : state) {
+        Slice page_slice = original;
+        std::unique_ptr<DataPage> decoded_page;
+        Status st = pre_decoder.decode(&decoded_page, &page_slice, 
/*size_of_tail=*/0,
+                                       /*use_cache=*/false, 
PageTypePB::DATA_PAGE,
+                                       /*file_path=*/std::string());
+        CHECK(st.ok()) << st;
+        benchmark::DoNotOptimize(page_slice);
+        benchmark::DoNotOptimize(decoded_page);
+        benchmark::ClobberMemory();
+    }
+
+    state.SetItemsProcessed(static_cast<int64_t>(state.iterations()) *
+                            static_cast<int64_t>(num_elems));
+    state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+                            static_cast<int64_t>(original.size));
+    state.counters["page_bytes"] = static_cast<double>(original.size);
+    state.counters["ns_per_elem"] = benchmark::Counter(
+            static_cast<double>(num_elems),
+            benchmark::Counter::kIsIterationInvariantRate | 
benchmark::Counter::kInvert);
+}
+
+inline void BM_BinaryPlainPageV2_PreDecode(benchmark::State& state) {
+    run_pre_decode_bm<BinaryPlainPageV2Builder, 
BinaryPlainPageV2PreDecoder<false>>(state);
+}
+
+inline void BM_BinaryPlainPageV3_PreDecode(benchmark::State& state) {
+    run_pre_decode_bm<BinaryPlainPageV3Builder, 
BinaryPlainPageV3PreDecoder<false>>(state);
+}
+
+// (num_elems, value_len) grid. Picks representative sizes: a "many small"
+// case (8 byte values like compact dictionary keys), a typical varchar case
+// (32, 128 bytes), and a "large value" case (1024 bytes). num_elems range
+// covers small / medium / page-sized pages.
+static void V2V3PreDecodeArgs(benchmark::internal::Benchmark* b) {
+    for (int n : {256, 1024, 4096, 16384}) {
+        for (int len : {8, 32, 128, 1024}) {
+            b->Args({n, len});
+        }
+    }
+}
+
+BENCHMARK(BM_BinaryPlainPageV2_PreDecode)->Apply(V2V3PreDecodeArgs);
+BENCHMARK(BM_BinaryPlainPageV3_PreDecode)->Apply(V2V3PreDecodeArgs);
+
+// ---------------------------------------------------------------------------
+// Fixed-page-size variants: pin to a production page size so we measure
+// pre-decode cost at realistic byte counts. num_elems is derived from
+// value_len so each input page lands at ~target_bytes.
+//   - 64 KiB matches STORAGE_PAGE_SIZE_DEFAULT_VALUE (default data page)
+//   - 256 KiB matches STORAGE_DICT_PAGE_SIZE_DEFAULT_VALUE (dict / large)
+// ---------------------------------------------------------------------------
+
+inline constexpr size_t kBenchPage64KiB = 64 * 1024;
+inline constexpr size_t kBenchPage256KiB = 256 * 1024;
+
+// Pick num_elems so that (varint_len + value_len) * num_elems ~= target_bytes.
+// Varint cost: 1 byte for value_len < 128, 2 bytes for value_len < 16384.
+inline size_t elems_for_target(size_t target_bytes, size_t value_len) {
+    const size_t varint_bytes = value_len < 128 ? 1 : 2;
+    const size_t per_entry = varint_bytes + value_len;
+    return target_bytes / per_entry;
+}
+
+template <template <FieldType> class BuilderT, class PreDecoderT, size_t 
TargetBytes>
+inline void run_pre_decode_fixed_page_bm(benchmark::State& state) {
+    const size_t value_len = static_cast<size_t>(state.range(0));
+    const size_t num_elems = elems_for_target(TargetBytes, value_len);
+
+    auto corpus = make_corpus(num_elems, value_len);
+    OwnedSlice owned = build_page<BuilderT>(corpus);
+    const Slice original = owned.slice();
+
+    PreDecoderT pre_decoder;
+    for (auto _ : state) {
+        Slice page_slice = original;
+        std::unique_ptr<DataPage> decoded_page;
+        Status st = pre_decoder.decode(&decoded_page, &page_slice, 
/*size_of_tail=*/0,
+                                       /*use_cache=*/false, 
PageTypePB::DATA_PAGE,
+                                       /*file_path=*/std::string());
+        CHECK(st.ok()) << st;
+        benchmark::DoNotOptimize(page_slice);
+        benchmark::DoNotOptimize(decoded_page);
+        benchmark::ClobberMemory();
+    }
+
+    state.SetItemsProcessed(static_cast<int64_t>(state.iterations()) *
+                            static_cast<int64_t>(num_elems));
+    state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+                            static_cast<int64_t>(original.size));
+    state.counters["num_elems"] = static_cast<double>(num_elems);
+    state.counters["page_bytes"] = static_cast<double>(original.size);
+    state.counters["ns_per_elem"] = benchmark::Counter(
+            static_cast<double>(num_elems),
+            benchmark::Counter::kIsIterationInvariantRate | 
benchmark::Counter::kInvert);
+}
+
+inline void BM_BinaryPlainPageV2_PreDecode_64KiB(benchmark::State& state) {
+    run_pre_decode_fixed_page_bm<BinaryPlainPageV2Builder, 
BinaryPlainPageV2PreDecoder<false>,
+                                 kBenchPage64KiB>(state);
+}
+inline void BM_BinaryPlainPageV3_PreDecode_64KiB(benchmark::State& state) {
+    run_pre_decode_fixed_page_bm<BinaryPlainPageV3Builder, 
BinaryPlainPageV3PreDecoder<false>,
+                                 kBenchPage64KiB>(state);
+}
+inline void BM_BinaryPlainPageV2_PreDecode_256KiB(benchmark::State& state) {
+    run_pre_decode_fixed_page_bm<BinaryPlainPageV2Builder, 
BinaryPlainPageV2PreDecoder<false>,
+                                 kBenchPage256KiB>(state);
+}
+inline void BM_BinaryPlainPageV3_PreDecode_256KiB(benchmark::State& state) {
+    run_pre_decode_fixed_page_bm<BinaryPlainPageV3Builder, 
BinaryPlainPageV3PreDecoder<false>,
+                                 kBenchPage256KiB>(state);
+}
+
+// value_len sweep. Each row produces a page near the target byte count;
+// num_elems is reported via the `num_elems` counter so the (N, len)
+// relationship is visible alongside the timing.
+static void V2V3PreDecodeFixedPageArgs(benchmark::internal::Benchmark* b) {
+    for (int len : {8, 16, 32, 64, 128, 256, 512, 1024, 4096}) {
+        b->Args({len});
+    }
+}
+
+BENCHMARK(BM_BinaryPlainPageV2_PreDecode_64KiB)->Apply(V2V3PreDecodeFixedPageArgs);
+BENCHMARK(BM_BinaryPlainPageV3_PreDecode_64KiB)->Apply(V2V3PreDecodeFixedPageArgs);
+BENCHMARK(BM_BinaryPlainPageV2_PreDecode_256KiB)->Apply(V2V3PreDecodeFixedPageArgs);
+BENCHMARK(BM_BinaryPlainPageV3_PreDecode_256KiB)->Apply(V2V3PreDecodeFixedPageArgs);
+
+// ===========================================================================
+// On-disk size comparison: encode a page with V2 vs V3, then ZSTD-compress the
+// encoded page (mirroring the segment write path, which compresses each page
+// before writing) and report the raw and ZSTD-compressed byte counts.
+//
+// Both V2 and V3 store the same bytes (only the layout differs), so this 
isolates the
+// effect of the V3 layout (contiguous data + contiguous varint lengths) on 
ZSTD's ratio.
+// Two corpora:
+//   - VARCHAR (variable length, no padding).
+//   - CHAR (fixed length, '\0'-padded as OlapColumnDataConvertorChar does): 
both V2 and V3
+//     keep the padding on disk, so this measures the layout effect on highly 
compressible
+//     padded data.
+// ===========================================================================
+
+// Build a CHAR corpus: each value is exactly `value_len` bytes (padded), with 
a
+// random logical content length in [1, value_len] followed by '\0' padding.
+inline std::vector<std::string> make_padded_char_corpus(size_t num_elems, 
size_t value_len) {
+    std::mt19937 rng(0xC0FFEEu);
+    std::uniform_int_distribution<int> ch('a', 'z');
+    std::uniform_int_distribution<size_t> len_dist(1, value_len);
+    std::vector<std::string> corpus;
+    corpus.reserve(num_elems);
+    for (size_t i = 0; i < num_elems; ++i) {
+        const size_t logical = len_dist(rng);
+        std::string s(value_len, '\0'); // trailing bytes stay '\0' (the 
padding)
+        for (size_t j = 0; j < logical; ++j) {
+            s[j] = static_cast<char>(ch(rng));
+        }
+        corpus.emplace_back(std::move(s));
+    }
+    return corpus;
+}
+
+template <template <FieldType> class BuilderT, FieldType Type>
+inline OwnedSlice build_page_typed(const std::vector<Slice>& slices) {
+    PageBuilderOptions opts;
+    opts.data_page_size = 0; // single page, no size-bound check
+    opts.dict_page_size = 0;
+
+    PageBuilder* raw = nullptr;
+    Status st = BuilderT<Type>::create(&raw, opts);
+    CHECK(st.ok()) << st;
+    std::unique_ptr<PageBuilder> builder(raw);
+
+    size_t count = slices.size();
+    st = builder->add(reinterpret_cast<const uint8_t*>(slices.data()), &count);
+    CHECK(st.ok()) << st;
+    CHECK_EQ(count, slices.size());
+
+    OwnedSlice out;
+    st = builder->finish(&out);
+    CHECK(st.ok()) << st;
+    return out;
+}
+
+inline BlockCompressionCodec* zstd_codec() {
+    BlockCompressionCodec* codec = nullptr;
+    Status st = 
get_block_compression_codec(segment_v2::CompressionTypePB::ZSTD, &codec);
+    CHECK(st.ok()) << st;
+    CHECK(codec != nullptr);
+    return codec;
+}
+
+// Build the page once, ZSTD-compress it in the timed loop, and report raw /
+// compressed byte counts plus the ratio. CharPadding selects the padded CHAR
+// corpus (and the CHAR builder specialization) vs the plain VARCHAR corpus.
+template <template <FieldType> class BuilderT, FieldType Type, bool 
CharPadding>
+inline void run_zstd_size_bm(benchmark::State& state) {
+    const size_t num_elems = static_cast<size_t>(state.range(0));
+    const size_t value_len = static_cast<size_t>(state.range(1));
+
+    std::vector<std::string> corpus = CharPadding ? 
make_padded_char_corpus(num_elems, value_len)
+                                                  : make_corpus(num_elems, 
value_len);
+    std::vector<Slice> slices;
+    slices.reserve(corpus.size());
+    for (const auto& s : corpus) {
+        slices.emplace_back(s.data(), s.size()); // full (padded) width for 
CHAR
+    }
+
+    OwnedSlice page = build_page_typed<BuilderT, Type>(slices);
+    const Slice raw = page.slice();
+
+    BlockCompressionCodec* codec = zstd_codec();
+    faststring compressed;
+    Status st = codec->compress(raw, &compressed);
+    CHECK(st.ok()) << st;
+    const size_t zstd_bytes = compressed.size();
+
+    for (auto _ : state) {
+        compressed.clear();
+        st = codec->compress(raw, &compressed);
+        CHECK(st.ok()) << st;
+        benchmark::DoNotOptimize(compressed);
+        benchmark::ClobberMemory();
+    }
+
+    state.SetBytesProcessed(static_cast<int64_t>(state.iterations()) *
+                            static_cast<int64_t>(raw.size));
+    state.counters["num_elems"] = static_cast<double>(num_elems);
+    state.counters["raw_bytes"] = static_cast<double>(raw.size);
+    state.counters["zstd_bytes"] = static_cast<double>(zstd_bytes);
+    state.counters["zstd_ratio"] =
+            static_cast<double>(raw.size) / 
static_cast<double>(std::max<size_t>(zstd_bytes, 1));
+}
+
+inline void BM_ZstdSize_V2_Varchar(benchmark::State& state) {
+    run_zstd_size_bm<BinaryPlainPageV2Builder, 
FieldType::OLAP_FIELD_TYPE_VARCHAR, false>(state);
+}
+inline void BM_ZstdSize_V3_Varchar(benchmark::State& state) {
+    run_zstd_size_bm<BinaryPlainPageV3Builder, 
FieldType::OLAP_FIELD_TYPE_VARCHAR, false>(state);
+}
+inline void BM_ZstdSize_V2_Char(benchmark::State& state) {
+    run_zstd_size_bm<BinaryPlainPageV2Builder, 
FieldType::OLAP_FIELD_TYPE_CHAR, true>(state);
+}
+inline void BM_ZstdSize_V3_Char(benchmark::State& state) {
+    run_zstd_size_bm<BinaryPlainPageV3Builder, 
FieldType::OLAP_FIELD_TYPE_CHAR, true>(state);
+}
+
+// (num_elems, value_len) grid.
+static void V2V3ZstdSizeArgs(benchmark::internal::Benchmark* b) {
+    for (int n : {1024, 16384}) {
+        for (int len : {8, 32, 128, 1024}) {
+            b->Args({n, len});
+        }
+    }
+}
+
+BENCHMARK(BM_ZstdSize_V2_Varchar)->Apply(V2V3ZstdSizeArgs);
+BENCHMARK(BM_ZstdSize_V3_Varchar)->Apply(V2V3ZstdSizeArgs);
+BENCHMARK(BM_ZstdSize_V2_Char)->Apply(V2V3ZstdSizeArgs);
+BENCHMARK(BM_ZstdSize_V3_Char)->Apply(V2V3ZstdSizeArgs);
+
+} // namespace segment_v2
+} // namespace doris
diff --git a/be/benchmark/benchmark_main.cpp b/be/benchmark/benchmark_main.cpp
index f3d0aa5001d..fa268fa2d0e 100644
--- a/be/benchmark/benchmark_main.cpp
+++ b/be/benchmark/benchmark_main.cpp
@@ -34,6 +34,16 @@
 #include "core/column/column_string.h"
 #include "core/data_type/data_type.h"
 #include "core/data_type/data_type_string.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/memory/thread_mem_tracker_mgr.h"
+#include "runtime/thread_context.h"
+
+// benchmark_binary_plain_page_v2.hpp must be included LAST: it transitively 
pulls AWS SDK
+// headers (via storage/cache/page_cache.h) whose symbols shadow types used by 
the benchmark
+// headers above (notably binary_cast_benchmark.hpp). Keeping it last avoids 
the clash without
+// disabling any benchmark. (Do not let clang-format reorder it above the 
others.)
+#include "benchmark_binary_plain_page_v2.hpp"
 
 namespace doris { // change if need
 
@@ -59,4 +69,23 @@ static void Example1(benchmark::State& state) {
 BENCHMARK(Example1);
 } // namespace doris
 
-BENCHMARK_MAIN();
+// Custom main: benchmarks that touch DataPage allocation require a Doris
+// ThreadContext + mem tracker, otherwise the allocator throws E-7412. Mirrors
+// the minimal subset of be/test/testutil/run_all_tests.cpp::main.
+int main(int argc, char** argv) {
+    SCOPED_INIT_THREAD_CONTEXT();
+    doris::ExecEnv::GetInstance()->init_mem_tracker();
+    doris::thread_context()->thread_mem_tracker_mgr->init();
+    auto bench_tracker = doris::MemTrackerLimiter::create_shared(
+            doris::MemTrackerLimiter::Type::GLOBAL, "BE-BENCH");
+    
doris::thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(bench_tracker);
+    doris::ExecEnv::set_tracking_memory(false);
+
+    ::benchmark::Initialize(&argc, argv);
+    if (::benchmark::ReportUnrecognizedArguments(argc, argv)) {
+        return 1;
+    }
+    ::benchmark::RunSpecifiedBenchmarks();
+    ::benchmark::Shutdown();
+    return 0;
+}
diff --git a/be/src/storage/segment/binary_dict_page.cpp 
b/be/src/storage/segment/binary_dict_page.cpp
index b3af337714b..fb85020bb15 100644
--- a/be/src/storage/segment/binary_dict_page.cpp
+++ b/be/src/storage/segment/binary_dict_page.cpp
@@ -31,6 +31,7 @@
 #include "core/column/column.h"
 #include "core/column/column_string.h"
 #include "storage/segment/binary_plain_page_v2.h"
+#include "storage/segment/binary_plain_page_v3.h"
 #include "storage/segment/bitshuffle_page.h"
 #include "storage/segment/encoding_info.h"
 #include "util/coding.h"
@@ -47,11 +48,7 @@ BinaryDictPageBuilder::BinaryDictPageBuilder(const 
PageBuilderOptions& options)
           _data_page_builder(nullptr),
           _dict_builder(nullptr),
           _encoding_type(DICT_ENCODING),
-          _binary_plain_encoding_type(
-                  options.dict_binary_plain_encoding ==
-                                  
BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2
-                          ? PLAIN_ENCODING_V2
-                          : PLAIN_ENCODING) {}
+          _binary_plain_encoding_type(options.dict_binary_plain_encoding) {}
 
 Status BinaryDictPageBuilder::init() {
     // initially use DICT_ENCODING
@@ -141,7 +138,8 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, 
size_t* count) {
         *count = num_added;
         return Status::OK();
     } else {
-        DCHECK(_encoding_type == PLAIN_ENCODING || _encoding_type == 
PLAIN_ENCODING_V2);
+        DCHECK(_encoding_type == PLAIN_ENCODING || _encoding_type == 
PLAIN_ENCODING_V2 ||
+               _encoding_type == PLAIN_ENCODING_V3);
         RETURN_IF_ERROR(_data_page_builder->add(vals, count));
         // For plain encoding, track raw data size from the input
         const Slice* src = reinterpret_cast<const Slice*>(vals);
@@ -237,6 +235,11 @@ Status BinaryDictPageDecoder::init() {
     } else if (_encoding_type == PLAIN_ENCODING_V2) {
         _data_page_decoder.reset(
                 new 
BinaryPlainPageV2Decoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(_data, _options));
+    } else if (_encoding_type == PLAIN_ENCODING_V3) {
+        // The V3 pre-decoder has already rewritten the inner page into the V1 
layout, so the
+        // V3 decoder (a BinaryPlainPageDecoder subclass) reads it like V1.
+        _data_page_decoder.reset(
+                new 
BinaryPlainPageV3Decoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(_data, _options));
     } else {
         LOG(WARNING) << "invalid encoding type:" << _encoding_type;
         return Status::Corruption("invalid encoding type:{}", _encoding_type);
diff --git a/be/src/storage/segment/binary_dict_page.h 
b/be/src/storage/segment/binary_dict_page.h
index c5367628561..60b39cc693d 100644
--- a/be/src/storage/segment/binary_dict_page.h
+++ b/be/src/storage/segment/binary_dict_page.h
@@ -106,8 +106,9 @@ private:
 
     EncodingTypePB _encoding_type;
 
-    // Binary-plain flavor (V1 or V2) used both for the dictionary word page 
and for the data
-    // page when the dictionary overflows. Resolved from 
PageBuilderOptions::dict_binary_plain_encoding.
+    // On-disk binary plain encoding (PLAIN_ENCODING / PLAIN_ENCODING_V2 / 
PLAIN_ENCODING_V3)
+    // used both for the dictionary word page and for the dict-overflow data 
page. Copied
+    // verbatim from PageBuilderOptions::dict_binary_plain_encoding.
     const EncodingTypePB _binary_plain_encoding_type;
 
     struct HashOfSlice {
diff --git a/be/src/storage/segment/binary_dict_page_pre_decoder.h 
b/be/src/storage/segment/binary_dict_page_pre_decoder.h
index c6f6721e70b..77adebe7565 100644
--- a/be/src/storage/segment/binary_dict_page_pre_decoder.h
+++ b/be/src/storage/segment/binary_dict_page_pre_decoder.h
@@ -21,6 +21,7 @@
 #include "storage/segment/binary_dict_page.h"
 #include "storage/segment/binary_plain_page_char_strip_pre_decoder.h"
 #include "storage/segment/binary_plain_page_v2_pre_decoder.h"
+#include "storage/segment/binary_plain_page_v3_pre_decoder.h"
 #include "storage/segment/bitshuffle_page_pre_decoder.h"
 #include "storage/segment/encoding_info.h"
 #include "util/coding.h"
@@ -73,10 +74,10 @@ struct BinaryDictPagePreDecoder : public DataPagePreDecoder 
{
         auto encoding_type =
                 static_cast<EncodingTypePB>(decode_fixed32_le((const 
uint8_t*)page_slice->data));
         if (encoding_type != DICT_ENCODING && encoding_type != 
PLAIN_ENCODING_V2 &&
-            encoding_type != PLAIN_ENCODING) {
+            encoding_type != PLAIN_ENCODING_V3 && encoding_type != 
PLAIN_ENCODING) {
             return Status::Corruption(
                     "Unknown encoding type: {} in file: {}, should one of 
<DICT_ENCODING, "
-                    "PLAIN_ENCODING_V2, PLAIN_ENCODING>",
+                    "PLAIN_ENCODING_V2, PLAIN_ENCODING_V3, PLAIN_ENCODING>",
                     encoding_type, file_path);
         }
         // For PLAIN_ENCODING, non-CHAR pages can be used as-is; CHAR pages
@@ -115,6 +116,12 @@ struct BinaryDictPagePreDecoder : public 
DataPagePreDecoder {
                                        _use_cache, page_type, file_path, 
total_prefix);
             break;
         }
+        case PLAIN_ENCODING_V3: {
+            BinaryPlainPageV3PreDecoder<IS_CHAR> v3_decoder;
+            status = v3_decoder.decode(&decoded_page, &data_without_header, 
size_of_tail,
+                                       _use_cache, page_type, file_path, 
total_prefix);
+            break;
+        }
         case PLAIN_ENCODING: {
             // Non-CHAR is short-circuited above; CHECK that the invariant
             // holds in case the short-circuit gets removed accidentally.
diff --git a/be/src/storage/segment/binary_plain_page_v3.h 
b/be/src/storage/segment/binary_plain_page_v3.h
new file mode 100644
index 00000000000..971dcec2c09
--- /dev/null
+++ b/be/src/storage/segment/binary_plain_page_v3.h
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Binary plain page encoding V3.
+//
+// The page consists of:
+// Data:
+//   |binary1|binary2|...|binaryN|
+// Lengths (contiguous varuint block):
+//   |varuint_len1|varuint_len2|...|varuint_lenN|
+// Trailer:
+//   |data_block_size(32-bit fixed)|num_elems(32-bit fixed)|
+//
+// vs V2 (which interleaves length and data per-entry), V3 lets the pre-decoder
+// memcpy the entire data block in a single shot when converting to V1 layout,
+// and walk the varint lengths once to fill the offsets array.
+//
+// V3 stores exactly the same bytes as V1/V2 — only the on-disk layout 
differs. In
+// particular, CHAR values keep their trailing '\0' padding on disk (as 
written by
+// OlapColumnDataConvertorChar); that padding is stripped on read by
+// BinaryPlainPageV3PreDecoder<true>, selected for (CHAR, PLAIN_ENCODING_V3), 
exactly
+// mirroring PLAIN_ENCODING_V2.
+
+#pragma once
+
+#include "common/logging.h"
+#include "core/column/column_complex.h"
+#include "core/column/column_nullable.h"
+#include "storage/olap_common.h"
+#include "storage/segment/binary_plain_page.h"
+#include "storage/segment/options.h"
+#include "storage/segment/page_builder.h"
+#include "storage/segment/page_decoder.h"
+#include "storage/types.h"
+#include "util/coding.h"
+#include "util/faststring.h"
+
+namespace doris {
+namespace segment_v2 {
+
+template <FieldType Type>
+class BinaryPlainPageV3Builder : public 
PageBuilderHelper<BinaryPlainPageV3Builder<Type>> {
+public:
+    using Self = BinaryPlainPageV3Builder<Type>;
+    friend class PageBuilderHelper<Self>;
+
+    Status init() override { return reset(); }
+
+    bool is_page_full() override {
+        bool ret = false;
+        if (_options.is_dict_page) {
+            ret = _options.dict_page_size != 0 && _size_estimate > 
_options.dict_page_size;
+        } else {
+            ret = _options.data_page_size != 0 && _size_estimate > 
_options.data_page_size;
+        }
+        return ret;
+    }
+
+    Status add(const uint8_t* vals, size_t* count) override {
+        DCHECK(!_finished);
+        DCHECK_GT(*count, 0);
+        size_t i = 0;
+
+        while (!is_page_full() && i < *count) {
+            const auto* src = reinterpret_cast<const Slice*>(vals);
+            if constexpr (Type == FieldType::OLAP_FIELD_TYPE_BITMAP) {
+                if (_options.need_check_bitmap) {
+                    RETURN_IF_ERROR(BitmapTypeCode::validate(*(src->data)));
+                }
+            }
+
+            // Append the data straight into the contiguous data buffer. V3 
stores the same
+            // bytes as V1/V2 (CHAR keeps its '\0' padding, VARCHAR does not); 
only the layout
+            // differs. CHAR padding is stripped on read by 
BinaryPlainPageV3PreDecoder<true>.
+            RETURN_IF_CATCH_EXCEPTION(_data_buffer.append(src->data, 
src->size));
+
+            // Encode varuint length into a scratch buffer, then append.
+            uint8_t length_buffer[5]; // max varint32 size
+            uint8_t* ptr = encode_varint32(length_buffer, 
cast_set<uint32_t>(src->size));
+            size_t length_size = ptr - length_buffer;
+            RETURN_IF_CATCH_EXCEPTION(_lengths_buffer.append(length_buffer, 
length_size));
+
+            _num_elems++;
+            _size_estimate += src->size + length_size;
+            _raw_data_size += src->size;
+
+            i++;
+            vals += sizeof(Slice);
+        }
+
+        *count = i;
+        return Status::OK();
+    }
+
+    Status finish(OwnedSlice* slice) override {
+        DCHECK(!_finished);
+        _finished = true;
+        RETURN_IF_CATCH_EXCEPTION({
+            // Layout: |data...|lengths...|data_block_size(u32)|num_elems(u32)|
+            const uint32_t data_block_size = 
cast_set<uint32_t>(_data_buffer.size());
+            // Append lengths after data.
+            _data_buffer.append(_lengths_buffer.data(), 
_lengths_buffer.size());
+            // Trailer: data_block_size, num_elems.
+            put_fixed32_le(&_data_buffer, data_block_size);
+            put_fixed32_le(&_data_buffer, _num_elems);
+            *slice = _data_buffer.build();
+        });
+        return Status::OK();
+    }
+
+    Status reset() override {
+        RETURN_IF_CATCH_EXCEPTION({
+            _data_buffer.clear();
+            _lengths_buffer.clear();
+            _data_buffer.reserve(_options.data_page_size == 0 ? 1024
+                                                              : 
std::min(_options.data_page_size,
+                                                                         
_options.dict_page_size));
+            _lengths_buffer.reserve(256);
+            _num_elems = 0;
+            // Reserve the trailer (data_block_size + num_elems).
+            _size_estimate = 2 * sizeof(uint32_t);
+            _finished = false;
+            _raw_data_size = 0;
+        });
+        return Status::OK();
+    }
+
+    size_t count() const override { return _num_elems; }
+
+    uint64_t size() const override { return _size_estimate; }
+
+    uint64_t get_raw_data_size() const override { return _raw_data_size; }
+
+private:
+    BinaryPlainPageV3Builder(const PageBuilderOptions& options)
+            : _size_estimate(0), _options(options) {}
+
+    faststring _data_buffer;
+    faststring _lengths_buffer;
+    uint32_t _num_elems = 0;
+    size_t _size_estimate = 0;
+    bool _finished = false;
+    PageBuilderOptions _options;
+    uint64_t _raw_data_size = 0;
+};
+
+// V3 decoder behaves identically to the V1 decoder because the V3 pre-decoder
+// converts the on-disk V3 layout to the V1 (offsets-array) layout before the
+// page is put into the page cache. The decoder operating on the cached page
+// therefore only needs to know how to read the V1 layout.
+template <FieldType Type>
+class BinaryPlainPageV3Decoder : public BinaryPlainPageDecoder<Type> {
+public:
+    BinaryPlainPageV3Decoder(Slice data) : BinaryPlainPageDecoder<Type>(data) 
{}
+
+    BinaryPlainPageV3Decoder(Slice data, const PageDecoderOptions& options)
+            : BinaryPlainPageDecoder<Type>(data, options) {}
+};
+
+} // namespace segment_v2
+} // namespace doris
diff --git a/be/src/storage/segment/binary_plain_page_v3_pre_decoder.h 
b/be/src/storage/segment/binary_plain_page_v3_pre_decoder.h
new file mode 100644
index 00000000000..feacca7e316
--- /dev/null
+++ b/be/src/storage/segment/binary_plain_page_v3_pre_decoder.h
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstring>
+#include <vector>
+
+#include "storage/cache/page_cache.h"
+#include "storage/segment/binary_plain_page_v2_pre_decoder.h" // 
BinaryPlainV1Entry, write_binary_plain_v1_output
+#include "storage/segment/encoding_info.h"
+#include "util/coding.h"
+
+namespace doris {
+namespace segment_v2 {
+
+/**
+ * @brief Pre-decoder for BinaryPlainPageV3.
+ *
+ * Converts the V3 layout (contiguous data + contiguous varuint length block)
+ * to the V1 layout (offsets array trailer) so the cached page can be served
+ * with O(1) seek.
+ *
+ * V3 format (input):
+ *   |binary1|binary2|...|binaryN|varuint_len1|...|varuint_lenN|
+ *   |data_block_size(32-bit)|num_elems(32-bit)|
+ *
+ * V1 format (output, same as BinaryPlainPageV2PreDecoder):
+ *   
|binary1|...|binaryN|offset1(32-bit)|...|offsetN(32-bit)|num_elems(32-bit)|
+ *
+ * The builder stores the same bytes as V1/V2 (CHAR keeps its '\0' padding, 
VARCHAR
+ * does not); only the layout differs. IS_CHAR selects how the stored lengths 
are
+ * treated on read, mirroring V2:
+ *   - IS_CHAR=false: values carry no padding, so the binary block is memcpy'd 
in a
+ *     single shot and the varint loop only fills the running offsets array — 
the two
+ *     passes are independent and the data is touched exactly once. This is 
the win vs
+ *     V2 (which chases a length pointer per entry). Used for all non-CHAR 
binary types
+ *     (VARCHAR/STRING/JSONB/VARIANT/HLL/BITMAP/QUANTILE_STATE/AGG_STATE).
+ *   - IS_CHAR=true: each entry is strnlen'd to drop the trailing '\0' padding 
that CHAR
+ *     values carry on disk, then the V1 page is built from the logical 
lengths. Selected
+ *     for (CHAR, PLAIN_ENCODING_V3) on read (e.g. the CHAR dictionary word 
page).
+ */
+template <bool IS_CHAR>
+struct BinaryPlainPageV3PreDecoder : public DataPagePreDecoder {
+    Status decode(std::unique_ptr<DataPage>* page, Slice* page_slice, size_t 
size_of_tail,
+                  bool _use_cache, segment_v2::PageTypePB page_type, const 
std::string& file_path,
+                  size_t size_of_prefix = 0) override {
+        // V3 trailer is two u32 words: data_block_size then num_elems.
+        constexpr size_t kV3TrailerSize = 2 * sizeof(uint32_t);
+
+        if (page_slice->size < kV3TrailerSize + size_of_tail) {
+            return Status::Corruption("Invalid V3 page size: {}, expected at 
least {} in file: {}",
+                                      page_slice->size, kV3TrailerSize + 
size_of_tail, file_path);
+        }
+
+        // page_slice->size >= kV3TrailerSize + size_of_tail is enforced above,
+        // so data.size = page_slice->size - size_of_tail >= kV3TrailerSize.
+        Slice data(page_slice->data, page_slice->size - size_of_tail);
+
+        const uint8_t* data_begin = reinterpret_cast<const 
uint8_t*>(data.data);
+        const uint8_t* trailer_ptr = data_begin + data.size - kV3TrailerSize;
+        uint32_t data_block_size = decode_fixed32_le(trailer_ptr);
+        uint32_t num_elems = decode_fixed32_le(trailer_ptr + sizeof(uint32_t));
+
+        // Use subtraction form to avoid uint32_t wraparound on a malicious
+        // data_block_size close to UINT32_MAX. data.size >= kV3TrailerSize.
+        if (data_block_size > data.size - kV3TrailerSize) {
+            return Status::Corruption("V3 data_block_size {} exceeds available 
data {} in file: {}",
+                                      data_block_size, data.size - 
kV3TrailerSize, file_path);
+        }
+
+        const uint8_t* lengths_ptr = data_begin + data_block_size;
+        const uint8_t* lengths_limit = trailer_ptr;
+
+        if constexpr (IS_CHAR) {
+            // ---- CHAR path: strnlen each entry to strip trailing '\0' 
padding. ----
+            // Walk the contiguous data block in lockstep with the length 
block:
+            // entry i starts at data_begin + running_raw and is `len` bytes 
wide.
+            const uint8_t* ptr = lengths_ptr;
+            uint32_t running_raw = 0;
+            std::vector<BinaryPlainV1Entry> entries;
+            entries.reserve(num_elems);
+            uint32_t total_out_len = 0;
+            for (uint32_t i = 0; i < num_elems; ++i) {
+                if (ptr >= lengths_limit) {
+                    return Status::Corruption(
+                            "V3 unexpected end of length block at element {} 
in file: {}", i,
+                            file_path);
+                }
+                uint32_t len = 0;
+                ptr = decode_varint32_ptr(ptr, lengths_limit, &len);
+                if (ptr == nullptr) {
+                    return Status::Corruption(
+                            "V3 failed to decode varuint at element {} in 
file: {}", i, file_path);
+                }
+                if (len > data_block_size - running_raw) {
+                    return Status::Corruption(
+                            "V3 entry {} length {} overflows data block in 
file: {}", i, len,
+                            file_path);
+                }
+                const uint8_t* entry_data = data_begin + running_raw;
+                uint32_t out_len = static_cast<uint32_t>(
+                        strnlen(reinterpret_cast<const char*>(entry_data), 
len));
+                entries.push_back({entry_data, out_len});
+                total_out_len += out_len;
+                running_raw += len;
+            }
+
+            if (running_raw != data_block_size) {
+                return Status::Corruption("V3 sum of lengths {} != 
data_block_size {} in file: {}",
+                                          running_raw, data_block_size, 
file_path);
+            }
+
+            return write_binary_plain_v1_output(entries, num_elems, 
total_out_len, *page_slice,
+                                                size_of_tail, size_of_prefix, 
_use_cache, page_type,
+                                                page, page_slice);
+        } else {
+            // ---- Fast path (non-CHAR): lengths are already logical. ----
+            const size_t offsets_size = static_cast<size_t>(num_elems) * 
sizeof(uint32_t);
+            const size_t v1_data_size = data_block_size + offsets_size + 
sizeof(uint32_t);
+            const size_t total_size = size_of_prefix + v1_data_size + 
size_of_tail;
+
+            std::unique_ptr<DataPage> decoded_page =
+                    std::make_unique<DataPage>(total_size, _use_cache, 
page_type);
+            Slice decoded_slice(decoded_page->data(), total_size);
+            char* output = decoded_slice.data + size_of_prefix;
+
+            // 1. Single memcpy of the entire binary payload.
+            if (data_block_size > 0) {
+                memcpy(output, data_begin, data_block_size);
+            }
+            output += data_block_size;
+
+            // 2. Walk varints once, write the running offsets array.
+            const uint8_t* ptr = lengths_ptr;
+            uint32_t running = 0;
+            for (uint32_t i = 0; i < num_elems; ++i) {
+                if (ptr >= lengths_limit) {
+                    return Status::Corruption(
+                            "V3 unexpected end of length block at element {} 
in file: {}", i,
+                            file_path);
+                }
+                uint32_t len = 0;
+                ptr = decode_varint32_ptr(ptr, lengths_limit, &len);
+                if (ptr == nullptr) {
+                    return Status::Corruption(
+                            "V3 failed to decode varuint at element {} in 
file: {}", i, file_path);
+                }
+                encode_fixed32_le(reinterpret_cast<uint8_t*>(output), running);
+                output += sizeof(uint32_t);
+                running += len;
+            }
+
+            if (running != data_block_size) {
+                return Status::Corruption("V3 sum of lengths {} != 
data_block_size {} in file: {}",
+                                          running, data_block_size, file_path);
+            }
+
+            // 3. num_elems trailer.
+            encode_fixed32_le(reinterpret_cast<uint8_t*>(output), num_elems);
+            output += sizeof(uint32_t);
+
+            // 4. Tail (footer + null map) carried through unchanged.
+            if (size_of_tail > 0) {
+                memcpy(output, page_slice->data + page_slice->size - 
size_of_tail, size_of_tail);
+            }
+
+            *page_slice = decoded_slice;
+            *page = std::move(decoded_page);
+            return Status::OK();
+        }
+    }
+};
+
+} // namespace segment_v2
+} // namespace doris
diff --git a/be/src/storage/segment/column_writer.cpp 
b/be/src/storage/segment/column_writer.cpp
index 5dd14d0c980..7d604897494 100644
--- a/be/src/storage/segment/column_writer.cpp
+++ b/be/src/storage/segment/column_writer.cpp
@@ -503,10 +503,12 @@ Status ScalarColumnWriter::init() {
     PageBuilderOptions opts;
     opts.data_page_size = _opts.data_page_size;
     opts.dict_page_size = _opts.dict_page_size;
+    // V3 segments store the dictionary word page (and the dict-overflow 
fallback plain page)
+    // with the V3 binary plain layout; pre-V3 segments keep V1.
     opts.dict_binary_plain_encoding =
             (_opts.storage_format == 
TabletStorageFormatPB::TABLET_STORAGE_FORMAT_V3)
-                    ? BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2
-                    : BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V1;
+                    ? PLAIN_ENCODING_V3
+                    : PLAIN_ENCODING;
     RETURN_IF_ERROR(_encoding_info->create_page_builder(opts, &page_builder));
     if (page_builder == nullptr) {
         return Status::NotSupported("Failed to create page builder for type {} 
and encoding {}",
diff --git a/be/src/storage/segment/encoding_info.cpp 
b/be/src/storage/segment/encoding_info.cpp
index 752627c2e28..d2b9d99f114 100644
--- a/be/src/storage/segment/encoding_info.cpp
+++ b/be/src/storage/segment/encoding_info.cpp
@@ -36,6 +36,8 @@
 #include "storage/segment/binary_plain_page_char_strip_pre_decoder.h"
 #include "storage/segment/binary_plain_page_v2.h"
 #include "storage/segment/binary_plain_page_v2_pre_decoder.h"
+#include "storage/segment/binary_plain_page_v3.h"
+#include "storage/segment/binary_plain_page_v3_pre_decoder.h"
 #include "storage/segment/binary_prefix_page.h"
 #include "storage/segment/bitshuffle_page.h"
 #include "storage/segment/bitshuffle_page_pre_decoder.h"
@@ -97,6 +99,20 @@ struct TypeEncodingTraits<type, PLAIN_ENCODING_V2, Slice> {
     }
 };
 
+// PLAIN_ENCODING_V3 is a binary plain page, only registered for Slice 
(binary) types, so it
+// has no non-Slice specialization (unlike PLAIN_ENCODING, which also serves 
numeric types).
+template <FieldType type>
+struct TypeEncodingTraits<type, PLAIN_ENCODING_V3, Slice> {
+    static Status create_page_builder(const PageBuilderOptions& opts, 
PageBuilder** builder) {
+        return BinaryPlainPageV3Builder<type>::create(builder, opts);
+    }
+    static Status create_page_decoder(const Slice& data, const 
PageDecoderOptions& opts,
+                                      PageDecoder** decoder) {
+        *decoder = new BinaryPlainPageV3Decoder<type>(data, opts);
+        return Status::OK();
+    }
+};
+
 template <FieldType type, typename CppType>
 struct TypeEncodingTraits<type, BIT_SHUFFLE, CppType,
                           typename std::enable_if<!std::is_same<CppType, 
Slice>::value>::type> {
@@ -252,22 +268,27 @@ EncodingInfoResolver::EncodingInfoResolver() {
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_CHAR, 
DICT_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_CHAR, 
PLAIN_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_CHAR, 
PLAIN_ENCODING_V2>();
+    _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_CHAR, 
PLAIN_ENCODING_V3>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_CHAR, 
PREFIX_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_VARCHAR, 
DICT_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_VARCHAR, 
PLAIN_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_VARCHAR, 
PLAIN_ENCODING_V2>();
+    _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_VARCHAR, 
PLAIN_ENCODING_V3>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_VARCHAR, 
PREFIX_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_STRING, 
DICT_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_STRING, 
PLAIN_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_STRING, 
PLAIN_ENCODING_V2>();
+    _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_STRING, 
PLAIN_ENCODING_V3>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_STRING, 
PREFIX_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_JSONB, 
DICT_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_JSONB, 
PLAIN_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_JSONB, 
PLAIN_ENCODING_V2>();
+    _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_JSONB, 
PLAIN_ENCODING_V3>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_JSONB, 
PREFIX_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_VARIANT, 
DICT_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_VARIANT, 
PLAIN_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_VARIANT, 
PLAIN_ENCODING_V2>();
+    _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_VARIANT, 
PLAIN_ENCODING_V3>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_VARIANT, 
PREFIX_ENCODING>();
 
     // BOOL
@@ -313,12 +334,16 @@ EncodingInfoResolver::EncodingInfoResolver() {
     // aggregate / binary-flavored types
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_HLL, 
PLAIN_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_HLL, 
PLAIN_ENCODING_V2>();
+    _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_HLL, 
PLAIN_ENCODING_V3>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_BITMAP, 
PLAIN_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_BITMAP, 
PLAIN_ENCODING_V2>();
+    _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_BITMAP, 
PLAIN_ENCODING_V3>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE, 
PLAIN_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE, 
PLAIN_ENCODING_V2>();
+    _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE, 
PLAIN_ENCODING_V3>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_AGG_STATE, 
PLAIN_ENCODING>();
     _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_AGG_STATE, 
PLAIN_ENCODING_V2>();
+    _register_supported_encoding<FieldType::OLAP_FIELD_TYPE_AGG_STATE, 
PLAIN_ENCODING_V3>();
 
     // ===== Phase 2a: V2 defaults (write path, V1/V2 segments) =====
     _set_v2_default<FieldType::OLAP_FIELD_TYPE_TINYINT, BIT_SHUFFLE>();
@@ -381,10 +406,10 @@ EncodingInfoResolver::EncodingInfoResolver() {
     _set_v3_default<FieldType::OLAP_FIELD_TYPE_DECIMAL256, BIT_SHUFFLE>();
     _set_v3_default<FieldType::OLAP_FIELD_TYPE_IPV4, BIT_SHUFFLE>();
     _set_v3_default<FieldType::OLAP_FIELD_TYPE_IPV6, BIT_SHUFFLE>();
-    _set_v3_default<FieldType::OLAP_FIELD_TYPE_HLL, PLAIN_ENCODING_V2>();
-    _set_v3_default<FieldType::OLAP_FIELD_TYPE_BITMAP, PLAIN_ENCODING_V2>();
-    _set_v3_default<FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE, 
PLAIN_ENCODING_V2>();
-    _set_v3_default<FieldType::OLAP_FIELD_TYPE_AGG_STATE, PLAIN_ENCODING_V2>();
+    _set_v3_default<FieldType::OLAP_FIELD_TYPE_HLL, PLAIN_ENCODING_V3>();
+    _set_v3_default<FieldType::OLAP_FIELD_TYPE_BITMAP, PLAIN_ENCODING_V3>();
+    _set_v3_default<FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE, 
PLAIN_ENCODING_V3>();
+    _set_v3_default<FieldType::OLAP_FIELD_TYPE_AGG_STATE, PLAIN_ENCODING_V3>();
 
     // ===== Phase 2c: IndexedColumn (value-seek) defaults =====
     // Only the PrimaryKeyIndexBuilder consults this map, and it hardcodes 
VARCHAR.
@@ -463,6 +488,23 @@ EncodingInfo::EncodingInfo(TraitsClass traits)
                     "non-Slice type {}",
                     int(TraitsClass::type)));
         }
+    } else if (_encoding == PLAIN_ENCODING_V3) {
+        // V3 binary plain pages store contiguous data followed by a 
contiguous varuint length
+        // block; the predecoder rewrites that into the V1 offset-array layout 
downstream Slice
+        // decoders expect. CHAR uses the IS_CHAR=true variant so the trailing 
'\0' padding of
+        // CHAR dictionary words (written with the VARCHAR builder) is 
stripped on read — mirroring
+        // PLAIN_ENCODING_V2. strnlen on a write-stripped CHAR page is a 
no-op, so the variant is
+        // also correct for direct CHAR plain V3 pages.
+        if constexpr (TraitsClass::type == FieldType::OLAP_FIELD_TYPE_CHAR) {
+            _data_page_pre_decoder = 
std::make_unique<BinaryPlainPageV3PreDecoder<true>>();
+        } else if constexpr (std::is_same_v<typename TraitsClass::CppType, 
Slice>) {
+            _data_page_pre_decoder = 
std::make_unique<BinaryPlainPageV3PreDecoder<false>>();
+        } else {
+            throw Exception(Status::FatalError(
+                    "PLAIN_ENCODING_V3 is only supported for Slice (binary) 
types, but got "
+                    "non-Slice type {}",
+                    int(TraitsClass::type)));
+        }
     }
 }
 
@@ -523,9 +565,10 @@ EncodingTypePB 
EncodingInfo::resolve_default_encoding(TabletStorageFormatPB stor
     const bool is_v3 = (storage_format == 
TabletStorageFormatPB::TABLET_STORAGE_FORMAT_V3);
 
     // Row store data is already serialized as a single blob. Keep it on plain 
pages to
-    // avoid introducing dictionary pages for the hidden row store column.
+    // avoid introducing dictionary pages for the hidden row store column; V3 
segments use
+    // the V3 binary plain layout.
     if (column.is_row_store_column()) {
-        return is_v3 ? PLAIN_ENCODING_V2 : PLAIN_ENCODING;
+        return is_v3 ? PLAIN_ENCODING_V3 : PLAIN_ENCODING;
     }
     return is_v3 ? get_v3_default_encoding(column.type()) : 
get_v2_default_encoding(column.type());
 }
diff --git a/be/src/storage/segment/options.h b/be/src/storage/segment/options.h
index a7c1ef0fa64..dbcb40ead84 100644
--- a/be/src/storage/segment/options.h
+++ b/be/src/storage/segment/options.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <gen_cpp/olap_file.pb.h>
+#include <gen_cpp/segment_v2.pb.h>
 
 #include <cstddef>
 
@@ -38,10 +39,10 @@ struct PageBuilderOptions {
 
     bool is_dict_page = false; // page used for saving dictionary
 
-    // BinaryPlain variant used by BinaryDictPageBuilder for its dict word 
page and
-    // dict-overflow fallback. Consumed only by BinaryDictPageBuilder.
-    BinaryPlainEncodingTypePB dict_binary_plain_encoding =
-            BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V1;
+    // On-disk binary plain encoding used by BinaryDictPageBuilder for its 
dict word page and
+    // dict-overflow fallback page (PLAIN_ENCODING / PLAIN_ENCODING_V2 / 
PLAIN_ENCODING_V3).
+    // Consumed only by BinaryDictPageBuilder.
+    EncodingTypePB dict_binary_plain_encoding = PLAIN_ENCODING;
 };
 
 struct PageDecoderOptions {
diff --git a/be/test/storage/segment/binary_dict_page_test.cpp 
b/be/test/storage/segment/binary_dict_page_test.cpp
index 79c76ba52d6..f402e98dc25 100644
--- a/be/test/storage/segment/binary_dict_page_test.cpp
+++ b/be/test/storage/segment/binary_dict_page_test.cpp
@@ -33,6 +33,8 @@
 #include "storage/segment/binary_plain_page.h"
 #include "storage/segment/binary_plain_page_v2.h"
 #include "storage/segment/binary_plain_page_v2_pre_decoder.h"
+#include "storage/segment/binary_plain_page_v3.h"
+#include "storage/segment/binary_plain_page_v3_pre_decoder.h"
 #include "storage/segment/page_builder.h"
 #include "storage/segment/page_decoder.h"
 #include "storage/types.h"
@@ -72,7 +74,7 @@ public:
     std::unique_ptr<PageDecoder> create_dict_page_decoder(Slice& dict_slice,
                                                           EncodingTypePB 
encoding_type,
                                                           
std::unique_ptr<DataPage>& decoded_page) {
-        // Apply pre-decode for BinaryPlainPageV2
+        // Apply pre-decode for BinaryPlainPageV2 / V3 (both convert to V1 
layout).
         if (encoding_type == PLAIN_ENCODING_V2) {
             BinaryPlainPageV2PreDecoder<false> pre_decoder;
             Status status = pre_decoder.decode(&decoded_page, &dict_slice, 0, 
false,
@@ -80,6 +82,13 @@ public:
             if (!status.ok()) {
                 return nullptr;
             }
+        } else if (encoding_type == PLAIN_ENCODING_V3) {
+            BinaryPlainPageV3PreDecoder<false> pre_decoder;
+            Status status = pre_decoder.decode(&decoded_page, &dict_slice, 0, 
false,
+                                               PageTypePB::DATA_PAGE, "");
+            if (!status.ok()) {
+                return nullptr;
+            }
         }
 
         PageDecoderOptions dict_decoder_options;
@@ -92,6 +101,10 @@ public:
             dict_page_decoder.reset(
                     new 
BinaryPlainPageV2Decoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(
                             dict_slice, dict_decoder_options));
+        } else if (encoding_type == PLAIN_ENCODING_V3) {
+            dict_page_decoder.reset(
+                    new 
BinaryPlainPageV3Decoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(
+                            dict_slice, dict_decoder_options));
         } else {
             return nullptr;
         }
@@ -143,9 +156,7 @@ public:
         PageBuilderOptions options;
         options.data_page_size = 256 * 1024;
         options.dict_page_size = 256 * 1024;
-        options.dict_binary_plain_encoding =
-                use_v2 ? BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2
-                       : BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V1;
+        options.dict_binary_plain_encoding = use_v2 ? PLAIN_ENCODING_V2 : 
PLAIN_ENCODING;
 
         auto page_builder = create_and_add_data(slices, options);
         ASSERT_NE(nullptr, page_builder);
@@ -163,14 +174,13 @@ public:
                 << "Expected encoding type does not match when use_v2=" << 
use_v2;
     }
 
-    void test_by_small_data_size(const std::vector<Slice>& slices, bool 
use_plain_v2 = false) {
+    void test_by_small_data_size(const std::vector<Slice>& slices,
+                                 EncodingTypePB dict_enc = PLAIN_ENCODING) {
         // Encode
         PageBuilderOptions options;
         options.data_page_size = 256 * 1024;
         options.dict_page_size = 256 * 1024;
-        options.dict_binary_plain_encoding =
-                use_plain_v2 ? 
BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2
-                             : 
BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V1;
+        options.dict_binary_plain_encoding = dict_enc;
 
         PageBuilder* builder_ptr = nullptr;
         Status ret0 = BinaryDictPageBuilder::create(&builder_ptr, options);
@@ -286,16 +296,15 @@ public:
         }
     }
 
-    void test_with_large_data_size(const std::vector<Slice>& contents, bool 
use_plain_v2 = false) {
+    void test_with_large_data_size(const std::vector<Slice>& contents,
+                                   EncodingTypePB dict_enc = PLAIN_ENCODING) {
         // Encode
         PageBuilderOptions options;
         // Use smaller page sizes to ensure we trigger fallback scenario
         // where dictionary gets full and we switch to plain encoding
         options.data_page_size = 64 * 1024; // 64KB data page
         options.dict_page_size = 1024;      // 1KB dict page to trigger 
fallback
-        options.dict_binary_plain_encoding =
-                use_plain_v2 ? 
BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2
-                             : 
BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V1;
+        options.dict_binary_plain_encoding = dict_enc;
 
         PageBuilder* builder_ptr = nullptr;
         Status ret0 = BinaryDictPageBuilder::create(&builder_ptr, options);
@@ -605,7 +614,7 @@ TEST_F(BinaryDictPageTest, 
TestConfigAffectsDictionaryPageEncoding) {
         PageBuilderOptions options;
         options.data_page_size = 256 * 1024;
         options.dict_page_size = 256 * 1024;
-        options.dict_binary_plain_encoding = 
BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V1;
+        options.dict_binary_plain_encoding = PLAIN_ENCODING;
 
         PageBuilder* builder_ptr = nullptr;
         Status status = BinaryDictPageBuilder::create(&builder_ptr, options);
@@ -645,7 +654,7 @@ TEST_F(BinaryDictPageTest, 
TestConfigAffectsDictionaryPageEncoding) {
         PageBuilderOptions options;
         options.data_page_size = 256 * 1024;
         options.dict_page_size = 256 * 1024;
-        options.dict_binary_plain_encoding = 
BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2;
+        options.dict_binary_plain_encoding = PLAIN_ENCODING_V2;
 
         PageBuilder* builder_ptr = nullptr;
         Status status = BinaryDictPageBuilder::create(&builder_ptr, options);
@@ -707,7 +716,7 @@ TEST_F(BinaryDictPageTest, 
TestConfigAffectsFallbackEncoding) {
         PageBuilderOptions options;
         options.data_page_size = 256 * 1024;
         options.dict_page_size = 128; // Small dict size to force fallback
-        options.dict_binary_plain_encoding = 
BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V1;
+        options.dict_binary_plain_encoding = PLAIN_ENCODING;
 
         PageBuilder* builder_ptr = nullptr;
         Status status = BinaryDictPageBuilder::create(&builder_ptr, options);
@@ -752,7 +761,7 @@ TEST_F(BinaryDictPageTest, 
TestConfigAffectsFallbackEncoding) {
         PageBuilderOptions options;
         options.data_page_size = 256 * 1024;
         options.dict_page_size = 128; // Small dict size to force fallback
-        options.dict_binary_plain_encoding = 
BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2;
+        options.dict_binary_plain_encoding = PLAIN_ENCODING_V2;
 
         PageBuilder* builder_ptr = nullptr;
         Status status = BinaryDictPageBuilder::create(&builder_ptr, options);
@@ -814,7 +823,7 @@ TEST_F(BinaryDictPageTest, TestSmallDataWithConfigFalse) {
         slices.emplace_back(str);
     }
 
-    test_by_small_data_size(slices, /*use_plain_v2=*/false);
+    test_by_small_data_size(slices, PLAIN_ENCODING);
 }
 
 TEST_F(BinaryDictPageTest, TestSmallDataWithConfigTrue) {
@@ -824,7 +833,7 @@ TEST_F(BinaryDictPageTest, TestSmallDataWithConfigTrue) {
         slices.emplace_back(str);
     }
 
-    test_by_small_data_size(slices, /*use_plain_v2=*/true);
+    test_by_small_data_size(slices, PLAIN_ENCODING_V2);
 }
 
 TEST_F(BinaryDictPageTest, TestLargeDataWithConfigFalse) {
@@ -845,7 +854,7 @@ TEST_F(BinaryDictPageTest, TestLargeDataWithConfigFalse) {
     }
 
     LOG(INFO) << "Testing large data with V1 preference, entry count: " << 
slices.size();
-    test_with_large_data_size(slices, /*use_plain_v2=*/false);
+    test_with_large_data_size(slices, PLAIN_ENCODING);
 }
 
 TEST_F(BinaryDictPageTest, TestLargeDataWithConfigTrue) {
@@ -866,7 +875,55 @@ TEST_F(BinaryDictPageTest, TestLargeDataWithConfigTrue) {
     }
 
     LOG(INFO) << "Testing large data with V2 preference, entry count: " << 
slices.size();
-    test_with_large_data_size(slices, /*use_plain_v2=*/true);
+    test_with_large_data_size(slices, PLAIN_ENCODING_V2);
+}
+
+// V3 dictionary internal encoding: the dictionary word page (small data) and 
the
+// dict-overflow fallback plain page (large data) round-trip through the V3 
layout.
+TEST_F(BinaryDictPageTest, TestSmallDataWithV3) {
+    auto src_strings = generate_test_data(50, "test_");
+    std::vector<Slice> slices;
+    for (const auto& str : src_strings) {
+        slices.emplace_back(str);
+    }
+
+    test_by_small_data_size(slices, PLAIN_ENCODING_V3);
+}
+
+TEST_F(BinaryDictPageTest, TestLargeDataWithV3) {
+    std::vector<std::string> src_strings;
+    auto unique_strings = generate_test_data(1000, "data_", 10, 50);
+    for (int i = 0; i < 100; ++i) {
+        for (const auto& str : unique_strings) {
+            src_strings.push_back(str);
+        }
+    }
+
+    std::vector<Slice> slices;
+    for (const auto& str : src_strings) {
+        slices.push_back(str);
+    }
+
+    LOG(INFO) << "Testing large data with V3 preference, entry count: " << 
slices.size();
+    test_with_large_data_size(slices, PLAIN_ENCODING_V3);
+}
+
+TEST_F(BinaryDictPageTest, TestConfigUseBinaryV3DictWordPageEncoding) {
+    auto src_strings = generate_test_data(50, "test_");
+    std::vector<Slice> slices;
+    for (const auto& str : src_strings) {
+        slices.emplace_back(str);
+    }
+    PageBuilderOptions options;
+    options.data_page_size = 256 * 1024;
+    options.dict_page_size = 256 * 1024;
+    options.dict_binary_plain_encoding = PLAIN_ENCODING_V3;
+
+    auto page_builder = create_and_add_data(slices, options);
+    ASSERT_NE(nullptr, page_builder);
+    EncodingTypePB dict_encoding_type;
+    
ASSERT_TRUE(page_builder->get_dictionary_page_encoding(&dict_encoding_type).ok());
+    EXPECT_EQ(PLAIN_ENCODING_V3, dict_encoding_type);
 }
 
 } // namespace segment_v2
diff --git a/be/test/storage/segment/binary_plain_page_v3_test.cpp 
b/be/test/storage/segment/binary_plain_page_v3_test.cpp
new file mode 100644
index 00000000000..4d6c88fb3f0
--- /dev/null
+++ b/be/test/storage/segment/binary_plain_page_v3_test.cpp
@@ -0,0 +1,566 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/segment/binary_plain_page_v3.h"
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <cstring>
+#include <limits>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/logging.h"
+#include "core/column/column_string.h"
+#include "storage/cache/page_cache.h"
+#include "storage/olap_common.h"
+#include "storage/segment/binary_plain_page_v3_pre_decoder.h"
+#include "storage/segment/page_builder.h"
+#include "storage/segment/page_decoder.h"
+#include "storage/types.h"
+#include "util/coding.h"
+
+namespace doris {
+namespace segment_v2 {
+
+class BinaryPlainPageV3Test : public testing::Test {
+public:
+    BinaryPlainPageV3Test() = default;
+    ~BinaryPlainPageV3Test() override = default;
+
+    // Apply the V3 pre-decode step so the resulting Slice / DataPage matches 
the V1 layout
+    // consumed by BinaryPlainPageDecoder. CHAR selects the IS_CHAR=true 
variant (which strips
+    // '\0' padding on read), exactly as EncodingInfo does for (CHAR, 
PLAIN_ENCODING_V3).
+    template <FieldType Type = FieldType::OLAP_FIELD_TYPE_VARCHAR>
+    Status apply_pre_decode(Slice& page_slice, std::unique_ptr<DataPage>& 
decoded_page) {
+        constexpr bool is_char = (Type == FieldType::OLAP_FIELD_TYPE_CHAR);
+        BinaryPlainPageV3PreDecoder<is_char> pre_decoder;
+        return pre_decoder.decode(&decoded_page, &page_slice, 0, false, 
PageTypePB::DATA_PAGE, "");
+    }
+
+    template <FieldType Type>
+    std::unique_ptr<PageBuilder> make_builder(size_t data_page_size = 256 * 
1024) {
+        PageBuilderOptions opts;
+        opts.data_page_size = data_page_size;
+
+        PageBuilder* raw = nullptr;
+        Status st = BinaryPlainPageV3Builder<Type>::create(&raw, opts);
+        EXPECT_TRUE(st.ok()) << st;
+        return std::unique_ptr<PageBuilder>(raw);
+    }
+
+    template <FieldType Type>
+    OwnedSlice build_page(const std::vector<Slice>& slices) {
+        auto builder = make_builder<Type>();
+        size_t count = slices.size();
+        Status st = builder->add(reinterpret_cast<const 
uint8_t*>(slices.data()), &count);
+        EXPECT_TRUE(st.ok()) << st;
+        EXPECT_EQ(slices.size(), count);
+
+        OwnedSlice owned;
+        st = builder->finish(&owned);
+        EXPECT_TRUE(st.ok()) << st;
+        return owned;
+    }
+
+    // Build the Slices fed to the builder. For CHAR, pad every value to a 
fixed declared
+    // length with trailing '\0' (as OlapColumnDataConvertorChar does) so the 
IS_CHAR read path
+    // is exercised; `backing` owns the padded bytes and must outlive the 
returned Slices.
+    // Decoded values must still equal the logical src_strings (callers must 
not pass embedded
+    // '\0' in CHAR inputs).
+    template <FieldType Type>
+    std::vector<Slice> make_input_slices(const std::vector<std::string>& 
src_strings,
+                                         std::vector<std::string>& backing) {
+        std::vector<Slice> slices;
+        slices.reserve(src_strings.size());
+        if constexpr (Type == FieldType::OLAP_FIELD_TYPE_CHAR) {
+            size_t padded_len = 0;
+            for (const auto& s : src_strings) {
+                padded_len = std::max(padded_len, s.size());
+            }
+            padded_len += 3; // guarantee real padding even for the longest 
value
+            backing.reserve(src_strings.size());
+            for (const auto& s : src_strings) {
+                std::string p = s;
+                p.resize(padded_len, '\0');
+                backing.push_back(std::move(p));
+            }
+            for (const auto& p : backing) {
+                slices.emplace_back(p.data(), p.size());
+            }
+        } else {
+            for (const auto& s : src_strings) {
+                slices.emplace_back(s);
+            }
+        }
+        return slices;
+    }
+
+    template <FieldType Type>
+    void test_encode_decode_page(const std::vector<std::string>& src_strings) {
+        std::vector<std::string> backing;
+        std::vector<Slice> slices = make_input_slices<Type>(src_strings, 
backing);
+
+        OwnedSlice owned = build_page<Type>(slices);
+        Slice page_slice = owned.slice();
+        std::unique_ptr<DataPage> decoded_page;
+        ASSERT_TRUE(apply_pre_decode<Type>(page_slice, decoded_page).ok());
+
+        PageDecoderOptions decoder_options;
+        BinaryPlainPageV3Decoder<Type> decoder(page_slice, decoder_options);
+        ASSERT_TRUE(decoder.init().ok());
+        ASSERT_EQ(slices.size(), decoder.count());
+
+        MutableColumnPtr column = ColumnString::create();
+        size_t num_to_read = slices.size();
+        ASSERT_TRUE(decoder.next_batch(&num_to_read, column).ok());
+        ASSERT_EQ(slices.size(), num_to_read);
+        ASSERT_EQ(slices.size(), column->size());
+
+        auto* string_column = assert_cast<ColumnString*>(column.get());
+        for (size_t i = 0; i < slices.size(); ++i) {
+            EXPECT_EQ(src_strings[i], 
string_column->get_data_at(i).to_string())
+                    << "Mismatch at index " << i;
+        }
+    }
+
+    template <FieldType Type>
+    void test_seek_in_page(const std::vector<std::string>& src_strings) {
+        std::vector<std::string> backing;
+        std::vector<Slice> slices = make_input_slices<Type>(src_strings, 
backing);
+
+        OwnedSlice owned = build_page<Type>(slices);
+        Slice page_slice = owned.slice();
+        std::unique_ptr<DataPage> decoded_page;
+        ASSERT_TRUE(apply_pre_decode<Type>(page_slice, decoded_page).ok());
+
+        PageDecoderOptions decoder_options;
+        BinaryPlainPageV3Decoder<Type> decoder(page_slice, decoder_options);
+        ASSERT_TRUE(decoder.init().ok());
+
+        const std::vector<size_t> seek_positions = {0, 2, slices.size() / 2, 
slices.size() - 1};
+        for (size_t pos : seek_positions) {
+            if (pos >= slices.size()) continue;
+
+            ASSERT_TRUE(decoder.seek_to_position_in_page(pos).ok());
+            EXPECT_EQ(pos, decoder.current_index());
+
+            MutableColumnPtr column = ColumnString::create();
+            size_t n = 1;
+            ASSERT_TRUE(decoder.next_batch(&n, column).ok());
+            EXPECT_EQ(1, n);
+            auto* sc = assert_cast<ColumnString*>(column.get());
+            EXPECT_EQ(src_strings[pos], sc->get_data_at(0).to_string())
+                    << "Mismatch at seek position " << pos;
+        }
+    }
+
+    template <FieldType Type>
+    void test_read_by_rowids(const std::vector<std::string>& src_strings) {
+        std::vector<std::string> backing;
+        std::vector<Slice> slices = make_input_slices<Type>(src_strings, 
backing);
+
+        OwnedSlice owned = build_page<Type>(slices);
+        Slice page_slice = owned.slice();
+        std::unique_ptr<DataPage> decoded_page;
+        ASSERT_TRUE(apply_pre_decode<Type>(page_slice, decoded_page).ok());
+
+        PageDecoderOptions decoder_options;
+        BinaryPlainPageV3Decoder<Type> decoder(page_slice, decoder_options);
+        ASSERT_TRUE(decoder.init().ok());
+
+        std::vector<rowid_t> rowids;
+        rowids.push_back(0);
+        rowids.push_back(2);
+        rowids.push_back(3);
+        rowids.push_back(static_cast<rowid_t>(slices.size() - 1));
+        ordinal_t page_first_ordinal = 0;
+
+        MutableColumnPtr column = ColumnString::create();
+        size_t num_to_read = rowids.size();
+        ASSERT_TRUE(decoder.read_by_rowids(rowids.data(), page_first_ordinal, 
&num_to_read, column)
+                            .ok());
+        EXPECT_EQ(rowids.size(), num_to_read);
+
+        auto* sc = assert_cast<ColumnString*>(column.get());
+        for (size_t i = 0; i < rowids.size(); ++i) {
+            EXPECT_EQ(src_strings[rowids[i]], sc->get_data_at(i).to_string())
+                    << "Mismatch at rowid " << rowids[i];
+        }
+    }
+
+    template <FieldType Type>
+    void test_empty_page() {
+        auto builder = make_builder<Type>();
+        OwnedSlice owned;
+        ASSERT_TRUE(builder->finish(&owned).ok());
+        EXPECT_EQ(0, builder->count());
+
+        Slice page_slice = owned.slice();
+        std::unique_ptr<DataPage> decoded_page;
+        ASSERT_TRUE(apply_pre_decode<Type>(page_slice, decoded_page).ok());
+
+        PageDecoderOptions decoder_options;
+        BinaryPlainPageV3Decoder<Type> decoder(page_slice, decoder_options);
+        ASSERT_TRUE(decoder.init().ok());
+        EXPECT_EQ(0, decoder.count());
+
+        MutableColumnPtr column = ColumnString::create();
+        size_t n = 1;
+        ASSERT_TRUE(decoder.next_batch(&n, column).ok());
+        EXPECT_EQ(0, n);
+        EXPECT_EQ(0, column->size());
+    }
+
+    template <FieldType Type>
+    void test_page_full() {
+        // Tiny size_estimate budget triggers is_page_full() before we drain 
all input.
+        auto builder = make_builder<Type>(/*data_page_size=*/128);
+
+        std::vector<std::string> src_strings;
+        for (int i = 0; i < 100; ++i) {
+            src_strings.push_back("test_string_" + std::to_string(i));
+        }
+        std::vector<Slice> slices;
+        slices.reserve(src_strings.size());
+        for (const auto& s : src_strings) {
+            slices.emplace_back(s);
+        }
+
+        size_t added = 0;
+        for (size_t i = 0; i < slices.size(); ++i) {
+            if (builder->is_page_full()) break;
+            size_t n = 1;
+            ASSERT_TRUE(builder->add(reinterpret_cast<const 
uint8_t*>(&slices[i]), &n).ok());
+            if (n > 0) added++;
+        }
+        EXPECT_GT(added, 0);
+        EXPECT_LT(added, slices.size());
+        EXPECT_TRUE(builder->is_page_full());
+    }
+
+    template <FieldType Type>
+    void test_various_length_strings() {
+        std::vector<std::string> src_strings;
+        src_strings.push_back("");
+        src_strings.push_back("a");
+        src_strings.push_back("ab");
+        src_strings.push_back("Hello, World!");
+        src_strings.push_back("Apache Doris is great");
+        src_strings.push_back(std::string(1000, 'x'));
+        src_strings.push_back("test\n\r\t");
+        src_strings.push_back("中文测试");
+        test_encode_decode_page<Type>(src_strings);
+    }
+
+    template <FieldType Type>
+    void test_reset() {
+        auto builder = make_builder<Type>();
+
+        std::vector<std::string> src_strings = {"test1", "test2"};
+        std::vector<Slice> slices;
+        slices.reserve(src_strings.size());
+        for (const auto& s : src_strings) {
+            slices.emplace_back(s);
+        }
+
+        size_t count = slices.size();
+        ASSERT_TRUE(builder->add(reinterpret_cast<const 
uint8_t*>(slices.data()), &count).ok());
+        EXPECT_EQ(2, builder->count());
+
+        ASSERT_TRUE(builder->reset().ok());
+        EXPECT_EQ(0, builder->count());
+
+        count = slices.size();
+        ASSERT_TRUE(builder->add(reinterpret_cast<const 
uint8_t*>(slices.data()), &count).ok());
+        EXPECT_EQ(2, builder->count());
+    }
+};
+
+// -------- VARCHAR --------
+TEST_F(BinaryPlainPageV3Test, TestEncodeDecodeVarchar) {
+    test_encode_decode_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>(
+            {"Hello", "World", "Apache", "Doris"});
+}
+TEST_F(BinaryPlainPageV3Test, TestSeekVarchar) {
+    test_seek_in_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>({"a", "b", "c", "d", 
"e", "f"});
+}
+TEST_F(BinaryPlainPageV3Test, TestReadByRowidsVarchar) {
+    test_read_by_rowids<FieldType::OLAP_FIELD_TYPE_VARCHAR>(
+            {"first", "second", "third", "fourth", "fifth"});
+}
+TEST_F(BinaryPlainPageV3Test, TestEmptyPageVarchar) {
+    test_empty_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>();
+}
+TEST_F(BinaryPlainPageV3Test, TestPageFullVarchar) {
+    test_page_full<FieldType::OLAP_FIELD_TYPE_VARCHAR>();
+}
+TEST_F(BinaryPlainPageV3Test, TestVariousLengthStringsVarchar) {
+    test_various_length_strings<FieldType::OLAP_FIELD_TYPE_VARCHAR>();
+}
+TEST_F(BinaryPlainPageV3Test, TestResetVarchar) {
+    test_reset<FieldType::OLAP_FIELD_TYPE_VARCHAR>();
+}
+TEST_F(BinaryPlainPageV3Test, TestLargeNumberOfStrings) {
+    std::vector<std::string> v;
+    for (int i = 0; i < 1000; ++i) v.push_back("string_" + std::to_string(i));
+    test_encode_decode_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>(v);
+}
+TEST_F(BinaryPlainPageV3Test, TestSingleString) {
+    test_encode_decode_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>({"single"});
+}
+// The last element's end boundary is the implicit sentinel offset — offset(N) 
is never
+// stored; the decoder returns _offsets_pos (== data_block_size) for it. 
Exercise it at
+// length 0: empty value at the tail, a single-empty page, and an all-empty 
page.
+TEST_F(BinaryPlainPageV3Test, TestTrailingEmptyValue) {
+    test_encode_decode_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>({"abc", "", 
"de", ""});
+    test_encode_decode_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>({""});
+    test_encode_decode_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>({"", "", ""});
+    // CHAR values are padded to a fixed length, so an empty tail value is all 
'\0' and the
+    // IS_CHAR pre-decoder must strnlen it back to length 0 — the sentinel 
must still land
+    // exactly at data_block_size.
+    test_encode_decode_page<FieldType::OLAP_FIELD_TYPE_CHAR>({"x", "", "yz", 
""});
+}
+
+// -------- STRING / CHAR --------
+TEST_F(BinaryPlainPageV3Test, TestEncodeDecodeString) {
+    test_encode_decode_page<FieldType::OLAP_FIELD_TYPE_STRING>({"String1", 
"String2", "String3"});
+}
+TEST_F(BinaryPlainPageV3Test, TestEncodeDecodeChar) {
+    // Mixed lengths (including empty and multi-byte) so each value carries a 
different
+    // amount of '\0' padding; test_encode_decode_page pads to a fixed length 
and the
+    // IS_CHAR read path must strip it back to these logical values.
+    test_encode_decode_page<FieldType::OLAP_FIELD_TYPE_CHAR>({"Hi", "", 
"abcdef", "x", "中文"});
+}
+// seek and read_by_rowids on a padded CHAR page (the inputs are padded to a 
fixed length,
+// so the IS_CHAR pre-decoder strips the '\0' padding before these decode 
paths run).
+TEST_F(BinaryPlainPageV3Test, TestSeekChar) {
+    test_seek_in_page<FieldType::OLAP_FIELD_TYPE_CHAR>({"a", "bb", "", "dddd", 
"e", "ffffff"});
+}
+TEST_F(BinaryPlainPageV3Test, TestReadByRowidsChar) {
+    test_read_by_rowids<FieldType::OLAP_FIELD_TYPE_CHAR>({"first", "", 
"third", "fourth", "fifth"});
+}
+
+// Aggregate binary types (HLL/BITMAP/QUANTILE_STATE/AGG_STATE) default to 
plain V3 in V3
+// segments; verify the V3 page round-trips an opaque binary payload 
(including embedded
+// '\0', which non-CHAR types must preserve verbatim).
+TEST_F(BinaryPlainPageV3Test, TestEncodeDecodeAggState) {
+    test_encode_decode_page<FieldType::OLAP_FIELD_TYPE_AGG_STATE>(
+            {"agg_state_1", "", std::string("\x01\x02\x00\x03", 4), 
"another_state"});
+}
+
+// CHAR padding handling. OlapColumnDataConvertorChar pads CHAR values to 
their declared
+// length with trailing '\0'. V3 (like V1/V2) stores the padded bytes 
verbatim; the padding
+// is stripped on read by BinaryPlainPageV3PreDecoder<true>.
+namespace {
+// Build a fixed-length CHAR slice payload: each logical value padded with 
'\0' to
+// padded_len, exactly as the convertor hands it to the page builder.
+std::vector<std::string> make_padded_char_backing(const 
std::vector<std::string>& logical,
+                                                  size_t padded_len) {
+    std::vector<std::string> padded;
+    padded.reserve(logical.size());
+    for (const auto& s : logical) {
+        std::string p = s;
+        p.resize(padded_len, '\0'); // truncation never happens: callers keep 
s <= padded_len
+        padded.push_back(std::move(p));
+    }
+    return padded;
+}
+} // namespace
+
+// The V3 CHAR builder stores the padded bytes verbatim (same as V2); the 
IS_CHAR
+// pre-decoder strips the trailing '\0' padding on read so the decoded value 
is logical.
+TEST_F(BinaryPlainPageV3Test, TestCharBuilderKeepsPaddingStrippedOnRead) {
+    constexpr size_t kPaddedLen = 10; // CHAR(10)
+    const std::vector<std::string> logical = {"Hi", "", "abcdefghij", "x", 
"中文"};
+
+    // Backing store must outlive the Slices that point into it.
+    std::vector<std::string> padded = make_padded_char_backing(logical, 
kPaddedLen);
+    std::vector<Slice> slices;
+    slices.reserve(padded.size());
+    for (auto& p : padded) {
+        slices.emplace_back(p.data(), kPaddedLen); // full padded width, as 
the writer sees it
+    }
+
+    OwnedSlice owned = build_page<FieldType::OLAP_FIELD_TYPE_CHAR>(slices);
+    Slice page = owned.slice();
+    ASSERT_GE(page.size, 2 * sizeof(uint32_t));
+
+    // 1. The on-disk data block keeps the padded bytes verbatim (same as V2).
+    const auto* trailer = reinterpret_cast<const uint8_t*>(page.data + 
page.size - 8);
+    uint32_t data_block_size = decode_fixed32_le(trailer);
+    uint32_t num_elems = decode_fixed32_le(trailer + sizeof(uint32_t));
+    EXPECT_EQ(logical.size(), num_elems);
+    EXPECT_EQ(kPaddedLen * logical.size(), data_block_size) << "CHAR keeps 
padding on disk";
+
+    // 2. The IS_CHAR pre-decoder strips the padding on read; decoded values 
are logical.
+    std::unique_ptr<DataPage> decoded_page;
+    BinaryPlainPageV3PreDecoder<true> char_pre_decoder;
+    ASSERT_TRUE(char_pre_decoder.decode(&decoded_page, &page, 0, false, 
PageTypePB::DATA_PAGE, "")
+                        .ok());
+
+    PageDecoderOptions decoder_options;
+    BinaryPlainPageV3Decoder<FieldType::OLAP_FIELD_TYPE_CHAR> decoder(page, 
decoder_options);
+    ASSERT_TRUE(decoder.init().ok());
+    ASSERT_EQ(logical.size(), decoder.count());
+
+    MutableColumnPtr column = ColumnString::create();
+    size_t n = logical.size();
+    ASSERT_TRUE(decoder.next_batch(&n, column).ok());
+    ASSERT_EQ(logical.size(), n);
+    auto* sc = assert_cast<ColumnString*>(column.get());
+    for (size_t i = 0; i < logical.size(); ++i) {
+        EXPECT_EQ(logical[i], sc->get_data_at(i).to_string()) << "Mismatch at 
index " << i;
+    }
+}
+
+// V3-specific: the IS_CHAR pre-decoder strips trailing '\0' padding at read 
time.
+// This is the path used for CHAR dictionary word pages — they are written 
with the
+// VARCHAR builder (no write-time strip) so the padding IS on disk, and the 
dict read
+// path selects BinaryPlainPageV3PreDecoder<true> via EncodingInfo::get(CHAR, 
V3).
+TEST_F(BinaryPlainPageV3Test, TestCharPreDecoderStripsPaddingOnRead) {
+    constexpr size_t kPaddedLen = 12; // CHAR(12)
+    const std::vector<std::string> logical = {"hi", "", "abcdefghijkl", "x", 
"中文"};
+
+    std::vector<std::string> padded = make_padded_char_backing(logical, 
kPaddedLen);
+    std::vector<Slice> slices;
+    slices.reserve(padded.size());
+    for (auto& p : padded) {
+        slices.emplace_back(p.data(), kPaddedLen);
+    }
+
+    // Write with the VARCHAR builder so the padded bytes ARE stored on disk, 
exactly
+    // like the dictionary word page does for a CHAR column.
+    OwnedSlice owned = build_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>(slices);
+    Slice page = owned.slice();
+    ASSERT_GE(page.size, 2 * sizeof(uint32_t));
+
+    // The raw page keeps the full padded width.
+    const auto* trailer = reinterpret_cast<const uint8_t*>(page.data + 
page.size - 8);
+    uint32_t data_block_size = decode_fixed32_le(trailer);
+    EXPECT_EQ(kPaddedLen * logical.size(), data_block_size) << "raw page 
should keep padding";
+
+    // The IS_CHAR pre-decoder strips it on read.
+    std::unique_ptr<DataPage> decoded_page;
+    BinaryPlainPageV3PreDecoder<true> char_pre_decoder;
+    ASSERT_TRUE(char_pre_decoder.decode(&decoded_page, &page, 0, false, 
PageTypePB::DATA_PAGE, "")
+                        .ok());
+
+    PageDecoderOptions decoder_options;
+    BinaryPlainPageV3Decoder<FieldType::OLAP_FIELD_TYPE_CHAR> decoder(page, 
decoder_options);
+    ASSERT_TRUE(decoder.init().ok());
+    ASSERT_EQ(logical.size(), decoder.count());
+
+    MutableColumnPtr column = ColumnString::create();
+    size_t n = logical.size();
+    ASSERT_TRUE(decoder.next_batch(&n, column).ok());
+    ASSERT_EQ(logical.size(), n);
+    auto* sc = assert_cast<ColumnString*>(column.get());
+    for (size_t i = 0; i < logical.size(); ++i) {
+        EXPECT_EQ(logical[i], sc->get_data_at(i).to_string()) << "Mismatch at 
index " << i;
+    }
+}
+
+// -------- V3-specific: varint length boundaries --------
+// Varint32 length encoding crosses byte boundaries at 128 and 16384. Probe
+// the three width bands so we catch off-by-one bugs in the length-scan loop.
+TEST_F(BinaryPlainPageV3Test, TestVarintBoundaryLengths) {
+    std::vector<std::string> v;
+    // 1-byte varint band (<128).
+    v.push_back(std::string(0, 'a'));   // empty -> varint 0x00
+    v.push_back(std::string(1, 'a'));   // 1
+    v.push_back(std::string(127, 'a')); // last 1-byte varint
+    // 2-byte varint band ([128, 16384)).
+    v.push_back(std::string(128, 'b')); // first 2-byte varint
+    v.push_back(std::string(255, 'b'));
+    v.push_back(std::string(16383, 'b')); // last 2-byte varint
+    // 3-byte varint band ([16384, 2M)).
+    v.push_back(std::string(16384, 'c')); // first 3-byte varint
+    v.push_back(std::string(20000, 'c'));
+
+    test_encode_decode_page<FieldType::OLAP_FIELD_TYPE_STRING>(v);
+}
+
+// V3-specific: cross-check the raw trailer layout. data_block_size sits
+// directly before num_elems, both little-endian uint32_t.
+TEST_F(BinaryPlainPageV3Test, TestRawTrailerLayout) {
+    std::vector<std::string> src = {"abc", "defgh", "ij"}; // sizes 3, 5, 2 = 
10 bytes data
+    std::vector<Slice> slices;
+    for (const auto& s : src) slices.emplace_back(s);
+
+    OwnedSlice owned = build_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>(slices);
+    Slice page = owned.slice();
+    ASSERT_GE(page.size, 2 * sizeof(uint32_t));
+
+    const auto* trailer = reinterpret_cast<const uint8_t*>(page.data + 
page.size - 8);
+    uint32_t data_block_size = decode_fixed32_le(trailer);
+    uint32_t num_elems = decode_fixed32_le(trailer + sizeof(uint32_t));
+
+    EXPECT_EQ(3, num_elems);
+    EXPECT_EQ(3 + 5 + 2, data_block_size);
+
+    // Data bytes are contiguous from offset 0; spot-check the first byte of
+    // each entry to confirm V3 does not interleave lengths.
+    EXPECT_EQ('a', page.data[0]);
+    EXPECT_EQ('d', page.data[3]);
+    EXPECT_EQ('i', page.data[8]);
+}
+
+// V3-specific: corruption rejection. Truncated trailer must be detected.
+TEST_F(BinaryPlainPageV3Test, TestCorruptionTooSmall) {
+    OwnedSlice owned = 
build_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>({Slice("x")});
+    // Shrink the page below the V3 trailer size (2 * uint32_t).
+    Slice page = owned.slice();
+    page.size = sizeof(uint32_t); // intentionally short
+    std::unique_ptr<DataPage> decoded_page;
+    Status st = apply_pre_decode(page, decoded_page);
+    EXPECT_FALSE(st.ok());
+}
+
+// V3-specific: a data_block_size close to UINT32_MAX must not pass the bounds
+// check via uint32 wraparound. With the old `data_block_size + kV3TrailerSize`
+// comparison, picking data_block_size = UINT32_MAX - 4 would overflow back to
+// a small value (< data.size) and slip through. The subtraction form rejects
+// it cleanly.
+TEST_F(BinaryPlainPageV3Test, TestCorruptionDataBlockSizeOverflow) {
+    OwnedSlice owned = 
build_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>({Slice("hello")});
+    Slice page = owned.slice();
+    auto* trailer = reinterpret_cast<uint8_t*>(const_cast<char*>(page.data) + 
page.size - 8);
+    encode_fixed32_le(trailer, std::numeric_limits<uint32_t>::max() - 4);
+    std::unique_ptr<DataPage> decoded_page;
+    Status st = apply_pre_decode(page, decoded_page);
+    EXPECT_FALSE(st.ok());
+}
+
+// V3-specific: data_block_size lying about how much data is present must be
+// rejected before we try to read past the page.
+TEST_F(BinaryPlainPageV3Test, TestCorruptionInflatedDataBlockSize) {
+    OwnedSlice owned = 
build_page<FieldType::OLAP_FIELD_TYPE_VARCHAR>({Slice("hello")});
+    Slice page = owned.slice();
+    // Rewrite the data_block_size field (8 bytes from end) to a value larger
+    // than the actual data section.
+    auto* trailer = reinterpret_cast<uint8_t*>(const_cast<char*>(page.data) + 
page.size - 8);
+    encode_fixed32_le(trailer, static_cast<uint32_t>(page.size + 1));
+    std::unique_ptr<DataPage> decoded_page;
+    Status st = apply_pre_decode(page, decoded_page);
+    EXPECT_FALSE(st.ok());
+}
+
+} // namespace segment_v2
+} // namespace doris
diff --git a/be/test/storage/segment/column_meta_accessor_test.cpp 
b/be/test/storage/segment/column_meta_accessor_test.cpp
index 48c2581b841..d5612aa8c85 100644
--- a/be/test/storage/segment/column_meta_accessor_test.cpp
+++ b/be/test/storage/segment/column_meta_accessor_test.cpp
@@ -749,7 +749,7 @@ TEST(ColumnMetaAccessorTest, 
RowStoreColumnDoesNotUseDictEncoding) {
                     .ok());
     EXPECT_EQ(kRowStoreUid, row_store_meta.unique_id());
     EXPECT_EQ(static_cast<int>(FieldType::OLAP_FIELD_TYPE_STRING), 
row_store_meta.type());
-    EXPECT_EQ(PLAIN_ENCODING_V2, row_store_meta.encoding());
+    EXPECT_EQ(PLAIN_ENCODING_V3, row_store_meta.encoding());
     EXPECT_NE(DICT_ENCODING, row_store_meta.encoding());
 }
 
diff --git a/be/test/storage/segment/encoding_info_test.cpp 
b/be/test/storage/segment/encoding_info_test.cpp
index 0a60c914e86..6a3b6e76e32 100644
--- a/be/test/storage/segment/encoding_info_test.cpp
+++ b/be/test/storage/segment/encoding_info_test.cpp
@@ -30,6 +30,7 @@
 #include "storage/segment/binary_dict_page_pre_decoder.h"
 #include "storage/segment/binary_plain_page_char_strip_pre_decoder.h"
 #include "storage/segment/binary_plain_page_v2_pre_decoder.h"
+#include "storage/segment/binary_plain_page_v3_pre_decoder.h"
 #include "storage/segment/bitshuffle_page_pre_decoder.h"
 #include "storage/types.h"
 
@@ -80,10 +81,10 @@ TEST_F(EncodingInfoTest, v2_vs_v3_defaults) {
     check_same(FieldType::OLAP_FIELD_TYPE_JSONB, "JSONB", DICT_ENCODING);
     check_same(FieldType::OLAP_FIELD_TYPE_VARIANT, "VARIANT", DICT_ENCODING);
 
-    // Aggregate/binary-flavored types: V2=PLAIN, V3=PLAIN_V2.
+    // Aggregate/binary-flavored types: V2=PLAIN, V3=PLAIN_V3.
     auto check_split = [](FieldType type, const std::string& name) {
         EXPECT_EQ(PLAIN_ENCODING, get_v2_default_encoding(type)) << name << " 
v2 default";
-        EXPECT_EQ(PLAIN_ENCODING_V2, get_v3_default_encoding(type)) << name << 
" v3 default";
+        EXPECT_EQ(PLAIN_ENCODING_V3, get_v3_default_encoding(type)) << name << 
" v3 default";
     };
     check_split(FieldType::OLAP_FIELD_TYPE_HLL, "HLL");
     check_split(FieldType::OLAP_FIELD_TYPE_BITMAP, "BITMAP");
@@ -191,6 +192,36 @@ TEST_F(EncodingInfoTest, test_all_pre_decoders) {
                         << " with PLAIN_ENCODING_V2 should have V2 
pre-decoder";
     }
 
+    // Test PLAIN_ENCODING_V3 with Slice types - should have 
BinaryPlainPageV3PreDecoder.
+    // Mirroring V2, CHAR uses the IS_CHAR=true variant (strips '\0' padding 
of CHAR
+    // dictionary words written with the VARCHAR builder); other binary types 
use <false>.
+    std::vector<FieldType> plain_v3_types = {
+            FieldType::OLAP_FIELD_TYPE_CHAR,      
FieldType::OLAP_FIELD_TYPE_VARCHAR,
+            FieldType::OLAP_FIELD_TYPE_STRING,    
FieldType::OLAP_FIELD_TYPE_JSONB,
+            FieldType::OLAP_FIELD_TYPE_VARIANT,   
FieldType::OLAP_FIELD_TYPE_HLL,
+            FieldType::OLAP_FIELD_TYPE_BITMAP,    
FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE,
+            FieldType::OLAP_FIELD_TYPE_AGG_STATE,
+    };
+
+    for (auto type : plain_v3_types) {
+        const EncodingInfo* encoding_info = nullptr;
+        auto status = EncodingInfo::get(type, PLAIN_ENCODING_V3, 
&encoding_info);
+        ASSERT_TRUE(status.ok()) << "Type " << static_cast<int>(type)
+                                 << " should support PLAIN_ENCODING_V3";
+        ASSERT_NE(nullptr, encoding_info);
+        auto* pre_decoder = encoding_info->get_data_page_pre_decoder();
+        ASSERT_NE(nullptr, pre_decoder) << "Type " << static_cast<int>(type)
+                                        << " with PLAIN_ENCODING_V3 should 
have pre_decoder";
+        bool ok =
+                (type == FieldType::OLAP_FIELD_TYPE_CHAR)
+                        ? 
dynamic_cast<BinaryPlainPageV3PreDecoder<true>*>(pre_decoder) != nullptr
+                        : 
dynamic_cast<BinaryPlainPageV3PreDecoder<false>*>(pre_decoder) != nullptr;
+        EXPECT_TRUE(ok)
+                << "Type " << static_cast<int>(type)
+                << " with PLAIN_ENCODING_V3 should have the right 
BinaryPlainPageV3PreDecoder"
+                << " variant";
+    }
+
     // Test PLAIN_ENCODING - should NOT have pre_decoder
     std::vector<FieldType> plain_encoding_types = {
             FieldType::OLAP_FIELD_TYPE_TINYINT,
@@ -345,10 +376,10 @@ const std::vector<DefaultExpectation> kV3DefaultExpect = {
         {FieldType::OLAP_FIELD_TYPE_DECIMAL256, BIT_SHUFFLE, "DECIMAL256"},
         {FieldType::OLAP_FIELD_TYPE_IPV4, BIT_SHUFFLE, "IPV4"},
         {FieldType::OLAP_FIELD_TYPE_IPV6, BIT_SHUFFLE, "IPV6"},
-        {FieldType::OLAP_FIELD_TYPE_HLL, PLAIN_ENCODING_V2, "HLL"},
-        {FieldType::OLAP_FIELD_TYPE_BITMAP, PLAIN_ENCODING_V2, "BITMAP"},
-        {FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE, PLAIN_ENCODING_V2, 
"QUANTILE_STATE"},
-        {FieldType::OLAP_FIELD_TYPE_AGG_STATE, PLAIN_ENCODING_V2, "AGG_STATE"},
+        {FieldType::OLAP_FIELD_TYPE_HLL, PLAIN_ENCODING_V3, "HLL"},
+        {FieldType::OLAP_FIELD_TYPE_BITMAP, PLAIN_ENCODING_V3, "BITMAP"},
+        {FieldType::OLAP_FIELD_TYPE_QUANTILE_STATE, PLAIN_ENCODING_V3, 
"QUANTILE_STATE"},
+        {FieldType::OLAP_FIELD_TYPE_AGG_STATE, PLAIN_ENCODING_V3, "AGG_STATE"},
 };
 
 // Expected V2 (non-V3) default per type.
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index a221b666747..3c5ca99f353 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -618,6 +618,7 @@ enum StorageMediumPB {
 enum BinaryPlainEncodingTypePB {
     BINARY_PLAIN_ENCODING_V1 = 1;
     BINARY_PLAIN_ENCODING_V2 = 2;
+    BINARY_PLAIN_ENCODING_V3 = 3;
 }
 
 message S3StorageParamPB {
diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto
index 3c8e646acd9..6fb6fa5fe03 100644
--- a/gensrc/proto/segment_v2.proto
+++ b/gensrc/proto/segment_v2.proto
@@ -41,6 +41,7 @@ enum EncodingTypePB {
     BIT_SHUFFLE = 6;
     FOR_ENCODING = 7; // Frame-Of-Reference
     PLAIN_ENCODING_V2 = 8; // Binary plain with varuint length prefix
+    PLAIN_ENCODING_V3 = 9; // Binary plain with contiguous data + contiguous 
varuint lengths trailer
 }
 
 enum CompressionTypePB {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to