This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7dcca9db48c [feat](paimon) integrate paimon-cpp reader (#60676)
7dcca9db48c is described below
commit 7dcca9db48c6b8b056c652afc84adeee920816c6
Author: Chenjunwei <[email protected]>
AuthorDate: Sat Feb 21 10:17:06 2026 +0800
[feat](paimon) integrate paimon-cpp reader (#60676)
Issue Number: #56005
Co-authored-by: morningman <[email protected]>
---
be/CMakeLists.txt | 124 ++++
be/cmake/thirdparty.cmake | 15 +
be/src/vec/exec/format/table/paimon_cpp_reader.cpp | 319 ++++++++++
be/src/vec/exec/format/table/paimon_cpp_reader.h | 95 +++
.../exec/format/table/paimon_doris_file_system.cpp | 664 +++++++++++++++++++++
.../exec/format/table/paimon_doris_file_system.h | 25 +
.../format/table/paimon_predicate_converter.cpp | 659 ++++++++++++++++++++
.../exec/format/table/paimon_predicate_converter.h | 90 +++
be/src/vec/exec/scan/file_scanner.cpp | 29 +-
.../exec/format/table/paimon_cpp_reader_test.cpp | 95 +++
.../java/org/apache/doris/paimon/PaimonUtils.java | 14 +-
.../apache/doris/datasource/paimon/PaimonUtil.java | 20 +
.../datasource/paimon/source/PaimonScanNode.java | 14 +-
.../datasource/paimon/source/PaimonSource.java | 9 +
.../java/org/apache/doris/qe/SessionVariable.java | 25 +-
gensrc/thrift/PaloInternalService.thrift | 2 +
16 files changed, 2185 insertions(+), 14 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index fb0de111359..924a88706bb 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -150,6 +150,17 @@ message(STATUS "build task executor simulator:
${BUILD_TASK_EXECUTOR_SIMULATOR}"
option(BUILD_FILE_CACHE_LRU_TOOL "ON for building file cache lru tool or OFF
for not" OFF)
message(STATUS "build file cache lru tool: ${BUILD_FILE_CACHE_LRU_TOOL}")
+option(ENABLE_PAIMON_CPP "Enable Paimon C++ integration" ON)
+set(PAIMON_HOME "" CACHE PATH "Paimon install prefix")
+
+# Allow env to override when reconfiguring (avoid picking /usr/local).
+if (DEFINED ENV{ENABLE_PAIMON_CPP})
+ set(ENABLE_PAIMON_CPP "$ENV{ENABLE_PAIMON_CPP}" CACHE BOOL "" FORCE)
+endif()
+if (DEFINED ENV{PAIMON_HOME} AND NOT PAIMON_HOME)
+ set(PAIMON_HOME "$ENV{PAIMON_HOME}" CACHE PATH "" FORCE)
+endif()
+
set(CMAKE_SKIP_RPATH TRUE)
set(Boost_USE_STATIC_LIBS ON)
set(Boost_USE_STATIC_RUNTIME ON)
@@ -550,6 +561,10 @@ set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
)
+if (ENABLE_PAIMON_CPP)
+ message(STATUS "Paimon C++ enabled: legacy thirdparty static linkage mode")
+endif()
+
if ((ARCH_AMD64 OR ARCH_AARCH64) AND OS_LINUX)
add_library(hadoop_hdfs STATIC IMPORTED)
set_target_properties(hadoop_hdfs PROPERTIES IMPORTED_LOCATION
${THIRDPARTY_DIR}/lib/hadoop_hdfs_3_4/native/libhdfs.a)
@@ -577,6 +592,13 @@ endif()
if (absl_FOUND)
set(COMMON_THIRDPARTY
${COMMON_THIRDPARTY}
+ absl::cord
+ absl::cord_internal
+ absl::cordz_functions
+ absl::cordz_info
+ absl::cordz_update_scope
+ absl::cordz_update_tracker
+ absl::crc_cord_state
absl::flags
absl::random_random
absl::spinlock_wait
@@ -602,6 +624,80 @@ if (BUILD_BENCHMARK)
)
endif()
+set(PAIMON_FACTORY_REGISTRY_LIBS)
+set(PAIMON_ARROW_CORE_LIB)
+set(PAIMON_ARROW_FILESYSTEM_LIB)
+set(PAIMON_ARROW_DATASET_LIB)
+set(PAIMON_ARROW_ACERO_LIB)
+if (ENABLE_PAIMON_CPP)
+ set(_paimon_arrow_core_candidates
+ ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow.a
+ ${THIRDPARTY_DIR}/lib64/libarrow.a
+ ${THIRDPARTY_DIR}/lib/libarrow.a
+ )
+ foreach(_paimon_arrow_core_candidate IN LISTS
_paimon_arrow_core_candidates)
+ if (EXISTS "${_paimon_arrow_core_candidate}")
+ add_library(paimon_arrow_core STATIC IMPORTED)
+ set_target_properties(paimon_arrow_core PROPERTIES
+ IMPORTED_LOCATION ${_paimon_arrow_core_candidate})
+ set(PAIMON_ARROW_CORE_LIB paimon_arrow_core)
+ break()
+ endif()
+ endforeach()
+ set(_paimon_arrow_filesystem_candidates
+ ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow_filesystem.a
+ ${THIRDPARTY_DIR}/lib64/libarrow_filesystem.a
+ ${THIRDPARTY_DIR}/lib/libarrow_filesystem.a
+ )
+ foreach(_paimon_arrow_filesystem_candidate IN LISTS
_paimon_arrow_filesystem_candidates)
+ if (EXISTS "${_paimon_arrow_filesystem_candidate}")
+ add_library(paimon_arrow_filesystem STATIC IMPORTED)
+ set_target_properties(paimon_arrow_filesystem PROPERTIES
+ IMPORTED_LOCATION ${_paimon_arrow_filesystem_candidate})
+ set(PAIMON_ARROW_FILESYSTEM_LIB paimon_arrow_filesystem)
+ break()
+ endif()
+ endforeach()
+ set(_paimon_arrow_dataset_candidates
+ ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow_dataset.a
+ ${THIRDPARTY_DIR}/lib64/libarrow_dataset.a
+ ${THIRDPARTY_DIR}/lib/libarrow_dataset.a
+ )
+ foreach(_paimon_arrow_dataset_candidate IN LISTS
_paimon_arrow_dataset_candidates)
+ if (EXISTS "${_paimon_arrow_dataset_candidate}")
+ add_library(paimon_arrow_dataset STATIC IMPORTED)
+ set_target_properties(paimon_arrow_dataset PROPERTIES
+ IMPORTED_LOCATION ${_paimon_arrow_dataset_candidate})
+ set(PAIMON_ARROW_DATASET_LIB paimon_arrow_dataset)
+ break()
+ endif()
+ endforeach()
+ set(_paimon_arrow_acero_candidates
+ ${THIRDPARTY_DIR}/paimon-cpp/lib64/paimon_deps/libarrow_acero.a
+ ${THIRDPARTY_DIR}/lib64/libarrow_acero.a
+ ${THIRDPARTY_DIR}/lib/libarrow_acero.a
+ )
+ foreach(_paimon_arrow_acero_candidate IN LISTS
_paimon_arrow_acero_candidates)
+ if (EXISTS "${_paimon_arrow_acero_candidate}")
+ add_library(paimon_arrow_acero STATIC IMPORTED)
+ set_target_properties(paimon_arrow_acero PROPERTIES
+ IMPORTED_LOCATION ${_paimon_arrow_acero_candidate})
+ set(PAIMON_ARROW_ACERO_LIB paimon_arrow_acero)
+ break()
+ endif()
+ endforeach()
+ if (PAIMON_ARROW_DATASET_LIB)
+ # paimon_parquet_file_format depends on Arrow Dataset symbols.
+ # Force-link it only when arrow_dataset is available.
+ set(PAIMON_FACTORY_REGISTRY_LIBS
+ paimon_parquet_file_format
+ )
+ list(REMOVE_ITEM COMMON_THIRDPARTY ${PAIMON_FACTORY_REGISTRY_LIBS})
+ else()
+ message(STATUS "Paimon C++: libarrow_dataset.a not found, keep
paimon_parquet_file_format as regular static lib")
+ endif()
+endif()
+
set(DORIS_DEPENDENCIES
${DORIS_DEPENDENCIES}
${WL_START_GROUP}
@@ -619,6 +715,34 @@ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES}
clucene-core-static)
set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} clucene-shared-static)
set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} clucene-contribs-lib)
+if (ENABLE_PAIMON_CPP)
+ if (PAIMON_FACTORY_REGISTRY_LIBS)
+ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES}
+ -Wl,--whole-archive
+ ${PAIMON_FACTORY_REGISTRY_LIBS}
+ -Wl,--no-whole-archive)
+ endif()
+ if (PAIMON_ARROW_CORE_LIB)
+ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${PAIMON_ARROW_CORE_LIB})
+ endif()
+ if (PAIMON_ARROW_FILESYSTEM_LIB)
+ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES}
${PAIMON_ARROW_FILESYSTEM_LIB})
+ endif()
+ if (PAIMON_ARROW_DATASET_LIB)
+ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES}
${PAIMON_ARROW_DATASET_LIB})
+ endif()
+ if (PAIMON_ARROW_ACERO_LIB)
+ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${PAIMON_ARROW_ACERO_LIB})
+ endif()
+
+ # paimon-cpp internal dependencies (renamed with _paimon suffix)
+ # These must come after paimon libraries to resolve symbols.
+ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} roaring_bitmap_paimon)
+ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} xxhash_paimon)
+ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} fmt_paimon)
+ set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} tbb_paimon)
+endif()
+
set(DORIS_DEPENDENCIES ${DORIS_DEPENDENCIES} ${WL_END_GROUP})
# Add all external dependencies. They should come after the palo libs.
diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake
index 9bb7b8ba748..441ebe8dc73 100644
--- a/be/cmake/thirdparty.cmake
+++ b/be/cmake/thirdparty.cmake
@@ -179,3 +179,18 @@ add_thirdparty(icudata LIB64)
add_thirdparty(pugixml LIB64)
+
+if (ENABLE_PAIMON_CPP)
+ add_thirdparty(paimon LIB64)
+ add_thirdparty(paimon_parquet_file_format LIB64)
+ add_thirdparty(paimon_orc_file_format LIB64)
+ add_thirdparty(paimon_blob_file_format LIB64)
+ add_thirdparty(paimon_local_file_system LIB64)
+ add_thirdparty(paimon_file_index LIB64)
+ add_thirdparty(paimon_global_index LIB64)
+
+ add_thirdparty(roaring_bitmap_paimon LIB64)
+ add_thirdparty(xxhash_paimon LIB64)
+ add_thirdparty(fmt_paimon LIB64)
+ add_thirdparty(tbb_paimon LIB64)
+endif()
diff --git a/be/src/vec/exec/format/table/paimon_cpp_reader.cpp
b/be/src/vec/exec/format/table/paimon_cpp_reader.cpp
new file mode 100644
index 00000000000..0bb89d0c73e
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_cpp_reader.cpp
@@ -0,0 +1,319 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "paimon_cpp_reader.h"
+
+#include <algorithm>
+#include <mutex>
+#include <utility>
+
+#include "arrow/c/bridge.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "paimon/defs.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/read_context.h"
+#include "paimon/table/source/table_read.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "util/url_coding.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exec/format/table/paimon_doris_file_system.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+namespace {
+constexpr const char* VALUE_KIND_FIELD = "_VALUE_KIND";
+
+} // namespace
+
+PaimonCppReader::PaimonCppReader(const std::vector<SlotDescriptor*>&
file_slot_descs,
+ RuntimeState* state, RuntimeProfile* profile,
+ const TFileRangeDesc& range,
+ const TFileScanRangeParams* range_params)
+ : _file_slot_descs(file_slot_descs),
+ _state(state),
+ _profile(profile),
+ _range(range),
+ _range_params(range_params) {
+ TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone,
_ctzz);
+ if (range.__isset.table_format_params &&
+ range.table_format_params.__isset.table_level_row_count) {
+ _remaining_table_level_row_count =
range.table_format_params.table_level_row_count;
+ } else {
+ _remaining_table_level_row_count = -1;
+ }
+}
+
+PaimonCppReader::~PaimonCppReader() = default;
+
+Status PaimonCppReader::init_reader() {
+ if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_remaining_table_level_row_count >= 0) {
+ return Status::OK();
+ }
+ return _init_paimon_reader();
+}
+
+Status PaimonCppReader::get_next_block(Block* block, size_t* read_rows, bool*
eof) {
+ if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_remaining_table_level_row_count >= 0) {
+ auto rows = std::min(_remaining_table_level_row_count,
+ (int64_t)_state->query_options().batch_size);
+ _remaining_table_level_row_count -= rows;
+ auto mutate_columns = block->mutate_columns();
+ for (auto& col : mutate_columns) {
+ col->resize(rows);
+ }
+ block->set_columns(std::move(mutate_columns));
+ *read_rows = rows;
+ *eof = false;
+ if (_remaining_table_level_row_count == 0) {
+ *eof = true;
+ }
+ return Status::OK();
+ }
+
+ if (!_batch_reader) {
+ return Status::InternalError("paimon-cpp reader is not initialized");
+ }
+
+ if (_col_name_to_block_idx.empty()) {
+ _col_name_to_block_idx = block->get_name_to_pos_map();
+ }
+
+ auto batch_result = _batch_reader->NextBatch();
+ if (!batch_result.ok()) {
+ return Status::InternalError("paimon-cpp read batch failed: {}",
+ batch_result.status().ToString());
+ }
+ auto batch = std::move(batch_result).value();
+ if (paimon::BatchReader::IsEofBatch(batch)) {
+ *read_rows = 0;
+ *eof = true;
+ return Status::OK();
+ }
+
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> import_result =
+ arrow::ImportRecordBatch(batch.first.get(), batch.second.get());
+ if (!import_result.ok()) {
+ return Status::InternalError("failed to import paimon-cpp arrow batch:
{}",
+ import_result.status().message());
+ }
+
+ auto record_batch = std::move(import_result).ValueUnsafe();
+ const auto num_rows = static_cast<size_t>(record_batch->num_rows());
+ const auto num_columns = record_batch->num_columns();
+ for (int c = 0; c < num_columns; ++c) {
+ const auto& field = record_batch->schema()->field(c);
+ if (field->name() == VALUE_KIND_FIELD) {
+ continue;
+ }
+
+ auto it = _col_name_to_block_idx.find(field->name());
+ if (it == _col_name_to_block_idx.end()) {
+ // Skip columns that are not in the block (e.g., partition columns
handled elsewhere)
+ continue;
+ }
+ const vectorized::ColumnWithTypeAndName& column_with_name =
+ block->get_by_position(it->second);
+ try {
+
RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow(
+ column_with_name.column->assume_mutable_ref(),
record_batch->column(c).get(), 0,
+ num_rows, _ctzz));
+ } catch (Exception& e) {
+ return Status::InternalError("Failed to convert from arrow to
block: {}", e.what());
+ }
+ }
+
+ *read_rows = num_rows;
+ *eof = false;
+ return Status::OK();
+}
+
+Status PaimonCppReader::get_columns(std::unordered_map<std::string,
DataTypePtr>* name_to_type,
+ std::unordered_set<std::string>*
missing_cols) {
+ for (const auto& slot : _file_slot_descs) {
+ name_to_type->emplace(slot->col_name(), slot->type());
+ }
+ return Status::OK();
+}
+
+Status PaimonCppReader::close() {
+ if (_batch_reader) {
+ _batch_reader->Close();
+ }
+ return Status::OK();
+}
+
+Status PaimonCppReader::_init_paimon_reader() {
+ register_paimon_doris_file_system();
+ RETURN_IF_ERROR(_decode_split(&_split));
+
+ auto table_path_opt = _resolve_table_path();
+ if (!table_path_opt.has_value()) {
+ return Status::InternalError(
+ "paimon-cpp missing paimon_table; cannot resolve paimon table
root path");
+ }
+ auto options = _build_options();
+ auto read_columns = _build_read_columns();
+
+ // Avoid moving strings across module boundaries to prevent allocator
mismatches in ASAN builds.
+ std::string table_path = table_path_opt.value();
+ static std::once_flag options_log_once;
+ std::call_once(options_log_once, [&]() {
+ auto has_key = [&](const char* key) {
+ auto it = options.find(key);
+ return (it != options.end() && !it->second.empty()) ? "set" :
"empty";
+ };
+ auto value_or = [&](const char* key) {
+ auto it = options.find(key);
+ return it != options.end() ? it->second : std::string("<unset>");
+ };
+ LOG(INFO) << "paimon-cpp options summary: table_path=" << table_path
+ << " AWS_ACCESS_KEY=" << has_key("AWS_ACCESS_KEY")
+ << " AWS_SECRET_KEY=" << has_key("AWS_SECRET_KEY")
+ << " AWS_TOKEN=" << has_key("AWS_TOKEN")
+ << " AWS_ENDPOINT=" << value_or("AWS_ENDPOINT")
+ << " AWS_REGION=" << value_or("AWS_REGION")
+ << " use_path_style=" << value_or("use_path_style")
+ << " fs.oss.endpoint=" << value_or("fs.oss.endpoint")
+ << " fs.s3a.endpoint=" << value_or("fs.s3a.endpoint");
+ });
+ paimon::ReadContextBuilder builder(table_path);
+ if (!read_columns.empty()) {
+ builder.SetReadSchema(read_columns);
+ }
+ if (!options.empty()) {
+ builder.SetOptions(options);
+ }
+ if (_predicate) {
+ builder.SetPredicate(_predicate);
+ builder.EnablePredicateFilter(true);
+ }
+
+ auto context_result = builder.Finish();
+ if (!context_result.ok()) {
+ return Status::InternalError("paimon-cpp build read context failed:
{}",
+ context_result.status().ToString());
+ }
+ auto context = std::move(context_result).value();
+
+ auto table_read_result = paimon::TableRead::Create(std::move(context));
+ if (!table_read_result.ok()) {
+ return Status::InternalError("paimon-cpp create table read failed: {}",
+ table_read_result.status().ToString());
+ }
+ auto table_read = std::move(table_read_result).value();
+ auto reader_result = table_read->CreateReader(_split);
+ if (!reader_result.ok()) {
+ return Status::InternalError("paimon-cpp create reader failed: {}",
+ reader_result.status().ToString());
+ }
+ _table_read = std::move(table_read);
+ _batch_reader = std::move(reader_result).value();
+ return Status::OK();
+}
+
+Status PaimonCppReader::_decode_split(std::shared_ptr<paimon::Split>* split) {
+ if (!_range.__isset.table_format_params ||
!_range.table_format_params.__isset.paimon_params ||
+ !_range.table_format_params.paimon_params.__isset.paimon_split) {
+ return Status::InternalError("paimon-cpp missing paimon_split in scan
range");
+ }
+ const auto& encoded_split =
_range.table_format_params.paimon_params.paimon_split;
+ std::string decoded_split;
+ if (!base64_decode(encoded_split, &decoded_split)) {
+ return Status::InternalError("paimon-cpp base64 decode paimon_split
failed");
+ }
+ auto pool = paimon::GetDefaultPool();
+ auto split_result =
+ paimon::Split::Deserialize(decoded_split.data(),
decoded_split.size(), pool);
+ if (!split_result.ok()) {
+ return Status::InternalError("paimon-cpp deserialize split failed: {}",
+ split_result.status().ToString());
+ }
+ *split = std::move(split_result).value();
+ return Status::OK();
+}
+
+std::optional<std::string> PaimonCppReader::_resolve_table_path() const {
+ if (_range.__isset.table_format_params &&
_range.table_format_params.__isset.paimon_params &&
+ _range.table_format_params.paimon_params.__isset.paimon_table &&
+ !_range.table_format_params.paimon_params.paimon_table.empty()) {
+ return _range.table_format_params.paimon_params.paimon_table;
+ }
+ return std::nullopt;
+}
+
+std::vector<std::string> PaimonCppReader::_build_read_columns() const {
+ std::vector<std::string> columns;
+ columns.reserve(_file_slot_descs.size());
+ for (const auto& slot : _file_slot_descs) {
+ columns.emplace_back(slot->col_name());
+ }
+ return columns;
+}
+
+std::map<std::string, std::string> PaimonCppReader::_build_options() const {
+ std::map<std::string, std::string> options;
+ if (_range.__isset.table_format_params &&
_range.table_format_params.__isset.paimon_params &&
+ _range.table_format_params.paimon_params.__isset.paimon_options) {
+
options.insert(_range.table_format_params.paimon_params.paimon_options.begin(),
+
_range.table_format_params.paimon_params.paimon_options.end());
+ }
+
+ if (_range_params && _range_params->__isset.properties &&
!_range_params->properties.empty()) {
+ for (const auto& kv : _range_params->properties) {
+ options[kv.first] = kv.second;
+ }
+ } else if (_range.__isset.table_format_params &&
+ _range.table_format_params.__isset.paimon_params &&
+ _range.table_format_params.paimon_params.__isset.hadoop_conf) {
+ for (const auto& kv :
_range.table_format_params.paimon_params.hadoop_conf) {
+ options[kv.first] = kv.second;
+ }
+ }
+
+ auto copy_if_missing = [&](const char* from_key, const char* to_key) {
+ if (options.find(to_key) != options.end()) {
+ return;
+ }
+ auto it = options.find(from_key);
+ if (it != options.end() && !it->second.empty()) {
+ options[to_key] = it->second;
+ }
+ };
+
+ // Map common OSS/S3 Hadoop configs to Doris S3 property keys.
+ copy_if_missing("fs.oss.accessKeyId", "AWS_ACCESS_KEY");
+ copy_if_missing("fs.oss.accessKeySecret", "AWS_SECRET_KEY");
+ copy_if_missing("fs.oss.sessionToken", "AWS_TOKEN");
+ copy_if_missing("fs.oss.endpoint", "AWS_ENDPOINT");
+ copy_if_missing("fs.oss.region", "AWS_REGION");
+ copy_if_missing("fs.s3a.access.key", "AWS_ACCESS_KEY");
+ copy_if_missing("fs.s3a.secret.key", "AWS_SECRET_KEY");
+ copy_if_missing("fs.s3a.session.token", "AWS_TOKEN");
+ copy_if_missing("fs.s3a.endpoint", "AWS_ENDPOINT");
+ copy_if_missing("fs.s3a.region", "AWS_REGION");
+ copy_if_missing("fs.s3a.path.style.access", "use_path_style");
+
+ options[paimon::Options::FILE_SYSTEM] = "doris";
+ return options;
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_cpp_reader.h
b/be/src/vec/exec/format/table/paimon_cpp_reader.h
new file mode 100644
index 00000000000..a1e8cfc1917
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_cpp_reader.h
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "cctz/time_zone.h"
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/table/source/split.h"
+#include "vec/exec/format/generic_reader.h"
+
+namespace paimon {
+class TableRead;
+class Predicate;
+} // namespace paimon
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+class Block;
+
+class PaimonCppReader : public GenericReader {
+ ENABLE_FACTORY_CREATOR(PaimonCppReader);
+
+public:
+ PaimonCppReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state,
+ RuntimeProfile* profile, const TFileRangeDesc& range,
+ const TFileScanRangeParams* range_params);
+ ~PaimonCppReader() override;
+
+ Status init_reader();
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+ Status get_columns(std::unordered_map<std::string, DataTypePtr>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
+ Status close() override;
+ void set_predicate(std::shared_ptr<paimon::Predicate> predicate) {
+ _predicate = std::move(predicate);
+ }
+
+private:
+ Status _init_paimon_reader();
+ Status _decode_split(std::shared_ptr<paimon::Split>* split);
+ // Resolve paimon table root path for schema/manifest lookup.
+ std::optional<std::string> _resolve_table_path() const;
+ std::vector<std::string> _build_read_columns() const;
+ std::map<std::string, std::string> _build_options() const;
+
+ const std::vector<SlotDescriptor*>& _file_slot_descs;
+ RuntimeState* _state = nullptr;
+ [[maybe_unused]] RuntimeProfile* _profile = nullptr;
+ const TFileRangeDesc& _range;
+ const TFileScanRangeParams* _range_params = nullptr;
+
+ std::shared_ptr<paimon::Split> _split;
+ std::unique_ptr<paimon::TableRead> _table_read;
+ std::unique_ptr<paimon::BatchReader> _batch_reader;
+ std::shared_ptr<paimon::Predicate> _predicate;
+
+ std::unordered_map<std::string, uint32_t> _col_name_to_block_idx;
+ int64_t _remaining_table_level_row_count = -1;
+ cctz::time_zone _ctzz;
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_doris_file_system.cpp
b/be/src/vec/exec/format/table/paimon_doris_file_system.cpp
new file mode 100644
index 00000000000..abe6fb0c7cb
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_doris_file_system.cpp
@@ -0,0 +1,664 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "paimon_doris_file_system.h"
+
+#include <algorithm>
+#include <cctype>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "gen_cpp/Types_types.h"
+#include "io/file_factory.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_system.h"
+#include "io/fs/file_writer.h"
+#include "io/fs/local_file_system.h"
+#include "paimon/factories/factory.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/file_system_factory.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+struct ParsedUri {
+ std::string scheme;
+ std::string authority;
+};
+
+std::string to_lower(std::string value) {
+ std::ranges::transform(value, value.begin(),
+ [](unsigned char c) { return
static_cast<char>(std::tolower(c)); });
+ return value;
+}
+
+ParsedUri parse_uri(const std::string& path) {
+ ParsedUri parsed;
+ size_t scheme_pos = path.find("://");
+ size_t delim_len = 3;
+ if (scheme_pos == std::string::npos) {
+ scheme_pos = path.find(":/");
+ delim_len = 2;
+ }
+ if (scheme_pos == std::string::npos || scheme_pos == 0) {
+ return parsed;
+ }
+ parsed.scheme = to_lower(path.substr(0, scheme_pos));
+ size_t authority_start = scheme_pos + delim_len;
+ if (authority_start >= path.size() || path[authority_start] == '/') {
+ return parsed;
+ }
+ size_t next_slash = path.find('/', authority_start);
+ if (next_slash == std::string::npos) {
+ parsed.authority = path.substr(authority_start);
+ } else {
+ parsed.authority = path.substr(authority_start, next_slash -
authority_start);
+ }
+ return parsed;
+}
+
+bool is_s3_scheme(const std::string& scheme) {
+ return scheme == "s3" || scheme == "s3a" || scheme == "s3n" || scheme ==
"oss" ||
+ scheme == "obs" || scheme == "cos" || scheme == "cosn" || scheme ==
"gs" ||
+ scheme == "abfs" || scheme == "abfss" || scheme == "wasb" || scheme
== "wasbs";
+}
+
+bool is_hdfs_scheme(const std::string& scheme) {
+ return scheme == "hdfs" || scheme == "viewfs" || scheme == "local";
+}
+
+bool is_http_scheme(const std::string& scheme) {
+ return scheme == "http" || scheme == "https";
+}
+
+doris::TFileType::type map_scheme_to_file_type(const std::string& scheme) {
+ if (scheme.empty()) {
+ return doris::TFileType::FILE_HDFS;
+ }
+ if (scheme == "file") {
+ return doris::TFileType::FILE_LOCAL;
+ }
+ if (is_hdfs_scheme(scheme)) {
+ return doris::TFileType::FILE_HDFS;
+ }
+ if (is_s3_scheme(scheme)) {
+ return doris::TFileType::FILE_S3;
+ }
+ if (is_http_scheme(scheme)) {
+ return doris::TFileType::FILE_HTTP;
+ }
+ if (scheme == "ofs" || scheme == "gfs" || scheme == "jfs") {
+ return doris::TFileType::FILE_BROKER;
+ }
+ return doris::TFileType::FILE_HDFS;
+}
+
+std::string replace_scheme(const std::string& path, const std::string& scheme)
{
+ size_t scheme_pos = path.find("://");
+ size_t delim_len = 3;
+ if (scheme_pos == std::string::npos) {
+ scheme_pos = path.find(":/");
+ delim_len = 2;
+ }
+ if (scheme_pos == std::string::npos) {
+ return path;
+ }
+ return scheme + "://" + path.substr(scheme_pos + delim_len);
+}
+
+std::string normalize_local_path(const std::string& path) {
+ if (!path.starts_with("file:")) {
+ return path;
+ }
+ constexpr size_t file_prefix_len = 5;
+ size_t start = file_prefix_len;
+ if (path.compare(start, 2, "//") == 0 && path.size() - start > 2) {
+ size_t next_slash = path.find('/', start + 2);
+ if (next_slash == std::string::npos) {
+ return "";
+ }
+ start = next_slash;
+ }
+ return path.substr(start);
+}
+
+std::string normalize_path_for_type(const std::string& path, const
std::string& scheme,
+ doris::TFileType::type type) {
+ if (type == doris::TFileType::FILE_LOCAL) {
+ return normalize_local_path(path);
+ }
+ if (type == doris::TFileType::FILE_S3 && scheme != "s3" &&
!is_http_scheme(scheme)) {
+ return replace_scheme(path, "s3");
+ }
+ return path;
+}
+
+std::string build_fs_cache_key(doris::TFileType::type type, const ParsedUri&
uri,
+ const std::string& default_fs_name) {
+ switch (type) {
+ case doris::TFileType::FILE_LOCAL:
+ return "local";
+ case doris::TFileType::FILE_S3:
+ return "s3://" + uri.authority;
+ case doris::TFileType::FILE_HTTP:
+ return "http://" + uri.authority;
+ case doris::TFileType::FILE_BROKER:
+ return "broker";
+ case doris::TFileType::FILE_HDFS:
+ default:
+ if (!uri.scheme.empty() || !uri.authority.empty()) {
+ return uri.scheme + "://" + uri.authority;
+ }
+ return default_fs_name;
+ }
+}
+
+paimon::Status to_paimon_status(const doris::Status& status) {
+ if (status.ok()) {
+ return paimon::Status::OK();
+ }
+ switch (status.code()) {
+ case doris::ErrorCode::NOT_FOUND:
+ case doris::ErrorCode::DIR_NOT_EXIST:
+ return paimon::Status::NotExist(status.to_string());
+ case doris::ErrorCode::ALREADY_EXIST:
+ case doris::ErrorCode::FILE_ALREADY_EXIST:
+ return paimon::Status::Exist(status.to_string());
+ case doris::ErrorCode::INVALID_ARGUMENT:
+ case doris::ErrorCode::INVALID_INPUT_SYNTAX:
+ return paimon::Status::Invalid(status.to_string());
+ case doris::ErrorCode::NOT_IMPLEMENTED_ERROR:
+ return paimon::Status::NotImplemented(status.to_string());
+ default:
+ return paimon::Status::IOError(status.to_string());
+ }
+}
+
+std::string join_path(const std::string& base, const std::string& child) {
+ if (base.empty()) {
+ return child;
+ }
+ if (base.back() == '/') {
+ return base + child;
+ }
+ return base + "/" + child;
+}
+
+std::string parent_path_no_scheme(const std::string& path) {
+ if (path.empty()) {
+ return "";
+ }
+ size_t end = path.size();
+ while (end > 1 && path[end - 1] == '/') {
+ --end;
+ }
+ size_t pos = path.rfind('/', end - 1);
+ if (pos == std::string::npos) {
+ return "";
+ }
+ if (pos == 0) {
+ return "/";
+ }
+ return path.substr(0, pos);
+}
+
+std::string parent_path(const std::string& path) {
+ ParsedUri uri = parse_uri(path);
+ if (uri.scheme.empty()) {
+ return parent_path_no_scheme(path);
+ }
+ size_t scheme_pos = path.find("://");
+ size_t delim_len = 3;
+ if (scheme_pos == std::string::npos) {
+ scheme_pos = path.find(":/");
+ delim_len = 2;
+ }
+ if (scheme_pos == std::string::npos) {
+ return parent_path_no_scheme(path);
+ }
+ size_t start = scheme_pos + delim_len;
+ size_t slash = path.find('/', start);
+ if (slash == std::string::npos) {
+ return "";
+ }
+ std::string path_part = path.substr(slash);
+ std::string parent_part = parent_path_no_scheme(path_part);
+ if (parent_part.empty()) {
+ return "";
+ }
+ std::string prefix = uri.scheme + "://";
+ if (!uri.authority.empty()) {
+ prefix += uri.authority;
+ }
+ return prefix + parent_part;
+}
+
+class DorisInputStream : public InputStream {
+public:
+ DorisInputStream(doris::io::FileReaderSPtr reader, std::string path)
+ : reader_(std::move(reader)), path_(std::move(path)) {}
+
+ Status Seek(int64_t offset, SeekOrigin origin) override {
+ int64_t target = 0;
+ if (origin == SeekOrigin::FS_SEEK_SET) {
+ target = offset;
+ } else if (origin == SeekOrigin::FS_SEEK_CUR) {
+ target = position_ + offset;
+ } else if (origin == SeekOrigin::FS_SEEK_END) {
+ target = static_cast<int64_t>(reader_->size()) + offset;
+ } else {
+ return Status::Invalid("unknown seek origin");
+ }
+ if (target < 0) {
+ return Status::Invalid("seek position is negative");
+ }
+ position_ = target;
+ return Status::OK();
+ }
+
+ Result<int64_t> GetPos() const override { return position_; }
+
+ Result<int32_t> Read(char* buffer, uint32_t size) override {
+ size_t bytes_read = 0;
+ doris::Status status = reader_->read_at(position_,
doris::Slice(buffer, size), &bytes_read);
+ if (!status.ok()) {
+ return to_paimon_status(status);
+ }
+ position_ += static_cast<int64_t>(bytes_read);
+ return static_cast<int32_t>(bytes_read);
+ }
+
+ Result<int32_t> Read(char* buffer, uint32_t size, uint64_t offset)
override {
+ size_t bytes_read = 0;
+ doris::Status status = reader_->read_at(offset, doris::Slice(buffer,
size), &bytes_read);
+ if (!status.ok()) {
+ return to_paimon_status(status);
+ }
+ return static_cast<int32_t>(bytes_read);
+ }
+
+ void ReadAsync(char* buffer, uint32_t size, uint64_t offset,
+ std::function<void(Status)>&& callback) override {
+ Result<int32_t> result = Read(buffer, size, offset);
+ Status status = Status::OK();
+ if (!result.ok()) {
+ status = result.status();
+ }
+ callback(status);
+ }
+
+ Result<std::string> GetUri() const override { return path_; }
+
+ Result<uint64_t> Length() const override { return
static_cast<uint64_t>(reader_->size()); }
+
+ Status Close() override { return to_paimon_status(reader_->close()); }
+
+private:
+ doris::io::FileReaderSPtr reader_;
+ std::string path_;
+ int64_t position_ = 0;
+};
+
+class DorisOutputStream : public OutputStream {
+public:
+ DorisOutputStream(doris::io::FileWriterPtr writer, std::string path)
+ : writer_(std::move(writer)), path_(std::move(path)) {}
+
+ Result<int32_t> Write(const char* buffer, uint32_t size) override {
+ doris::Status status = writer_->append(doris::Slice(buffer, size));
+ if (!status.ok()) {
+ return to_paimon_status(status);
+ }
+ return static_cast<int32_t>(size);
+ }
+
+ Status Flush() override { return Status::OK(); }
+
+ Result<int64_t> GetPos() const override {
+ return static_cast<int64_t>(writer_->bytes_appended());
+ }
+
+ Result<std::string> GetUri() const override { return path_; }
+
+ Status Close() override { return to_paimon_status(writer_->close()); }
+
+private:
+ doris::io::FileWriterPtr writer_;
+ std::string path_;
+};
+
+class DorisBasicFileStatus : public BasicFileStatus {
+public:
+ DorisBasicFileStatus(std::string path, bool is_dir) :
path_(std::move(path)), is_dir_(is_dir) {}
+
+ bool IsDir() const override { return is_dir_; }
+ std::string GetPath() const override { return path_; }
+
+private:
+ std::string path_;
+ bool is_dir_;
+};
+
+class DorisFileStatus : public FileStatus {
+public:
+ DorisFileStatus(std::string path, bool is_dir, uint64_t length, int64_t
mtime)
+ : path_(std::move(path)), is_dir_(is_dir), length_(length),
mtime_(mtime) {}
+
+ uint64_t GetLen() const override { return length_; }
+ bool IsDir() const override { return is_dir_; }
+ std::string GetPath() const override { return path_; }
+ int64_t GetModificationTime() const override { return mtime_; }
+
+private:
+ std::string path_;
+ bool is_dir_;
+ uint64_t length_;
+ int64_t mtime_;
+};
+
+class DorisFileSystem : public FileSystem {
+public:
+ explicit DorisFileSystem(std::map<std::string, std::string> options)
+ : options_(std::move(options)) {
+ auto it = options_.find("fs.defaultFS");
+ if (it != options_.end()) {
+ default_fs_name_ = it->second;
+ }
+ }
+
+ Result<std::unique_ptr<InputStream>> Open(const std::string& path) const
override {
+ PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+ auto& fs = resolved.first;
+ auto& normalized_path = resolved.second;
+ doris::io::FileReaderSPtr reader;
+ doris::io::FileReaderOptions reader_options =
doris::io::FileReaderOptions::DEFAULT;
+ doris::Status status = fs->open_file(normalized_path, &reader,
&reader_options);
+ if (!status.ok()) {
+ return to_paimon_status(status);
+ }
+ return std::make_unique<DorisInputStream>(std::move(reader),
normalized_path);
+ }
+
+ Result<std::unique_ptr<OutputStream>> Create(const std::string& path,
+ bool overwrite) const
override {
+ PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+ auto& fs = resolved.first;
+ auto& normalized_path = resolved.second;
+ if (!overwrite) {
+ bool exists = false;
+ doris::Status exists_status = fs->exists(normalized_path, &exists);
+ if (!exists_status.ok()) {
+ return to_paimon_status(exists_status);
+ }
+ if (exists) {
+ return Status::Exist("file already exists: ", normalized_path);
+ }
+ }
+ std::string parent = parent_path(normalized_path);
+ if (!parent.empty()) {
+ doris::Status mkdir_status = fs->create_directory(parent);
+ if (!mkdir_status.ok()) {
+ return to_paimon_status(mkdir_status);
+ }
+ }
+ doris::io::FileWriterPtr writer;
+ doris::Status status = fs->create_file(normalized_path, &writer);
+ if (!status.ok()) {
+ return to_paimon_status(status);
+ }
+ return std::make_unique<DorisOutputStream>(std::move(writer),
normalized_path);
+ }
+
+ Status Mkdirs(const std::string& path) const override {
+ PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+ doris::Status status =
resolved.first->create_directory(resolved.second);
+ return to_paimon_status(status);
+ }
+
+ Status Rename(const std::string& src, const std::string& dst) const
override {
+ PAIMON_ASSIGN_OR_RAISE(auto src_resolved, resolve_path(src));
+ PAIMON_ASSIGN_OR_RAISE(auto dst_resolved, resolve_path(dst));
+ doris::Status status = src_resolved.first->rename(src_resolved.second,
dst_resolved.second);
+ return to_paimon_status(status);
+ }
+
+ Status Delete(const std::string& path, bool recursive = true) const
override {
+ PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+ bool exists = false;
+ doris::Status exists_status = resolved.first->exists(resolved.second,
&exists);
+ if (!exists_status.ok()) {
+ return to_paimon_status(exists_status);
+ }
+ if (!exists) {
+ return Status::OK();
+ }
+ int64_t size = 0;
+ doris::Status size_status = resolved.first->file_size(resolved.second,
&size);
+ if (size_status.ok()) {
+ return
to_paimon_status(resolved.first->delete_file(resolved.second));
+ }
+ if (recursive) {
+ return
to_paimon_status(resolved.first->delete_directory(resolved.second));
+ }
+ return to_paimon_status(size_status);
+ }
+
+ Result<std::unique_ptr<FileStatus>> GetFileStatus(const std::string& path)
const override {
+ ParsedUri uri = parse_uri(path);
+ doris::TFileType::type type = map_scheme_to_file_type(uri.scheme);
+ PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+ bool exists = false;
+ doris::Status exists_status = resolved.first->exists(resolved.second,
&exists);
+ if (!exists_status.ok()) {
+ return to_paimon_status(exists_status);
+ }
+ if (!exists) {
+ if (type != doris::TFileType::FILE_S3) {
+ return Status::NotExist("path not exists: ", resolved.second);
+ }
+ std::vector<doris::io::FileInfo> files;
+ bool list_exists = false;
+ doris::Status list_status =
+ resolved.first->list(resolved.second, false, &files,
&list_exists);
+ if (!list_status.ok()) {
+ return to_paimon_status(list_status);
+ }
+ if (!list_exists && files.empty()) {
+ return Status::NotExist("path not exists: ", resolved.second);
+ }
+ return std::make_unique<DorisFileStatus>(resolved.second, true, 0,
0);
+ }
+ int64_t size = 0;
+ doris::Status size_status = resolved.first->file_size(resolved.second,
&size);
+ if (size_status.ok()) {
+ return std::make_unique<DorisFileStatus>(resolved.second, false,
+
static_cast<uint64_t>(size), 0);
+ }
+ std::vector<doris::io::FileInfo> files;
+ bool list_exists = false;
+ doris::Status list_status =
+ resolved.first->list(resolved.second, false, &files,
&list_exists);
+ if (!list_status.ok()) {
+ return to_paimon_status(list_status);
+ }
+ if (!list_exists && files.empty()) {
+ return Status::NotExist("path not exists: ", resolved.second);
+ }
+ return std::make_unique<DorisFileStatus>(resolved.second, true, 0, 0);
+ }
+
+ Status ListDir(const std::string& directory,
+ std::vector<std::unique_ptr<BasicFileStatus>>* status_list)
const override {
+ PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(directory));
+ auto file_status = GetFileStatus(directory);
+ if (file_status.ok() && !file_status.value()->IsDir()) {
+ return Status::IOError("path is not a directory: ", directory);
+ }
+ std::vector<doris::io::FileInfo> files;
+ bool exists = false;
+ doris::Status status = resolved.first->list(resolved.second, false,
&files, &exists);
+ if (!status.ok()) {
+ return to_paimon_status(status);
+ }
+ if (!exists) {
+ return Status::OK();
+ }
+ status_list->reserve(status_list->size() + files.size());
+ for (const auto& file : files) {
+ status_list->emplace_back(std::make_unique<DorisBasicFileStatus>(
+ join_path(resolved.second, file.file_name),
!file.is_file));
+ }
+ return Status::OK();
+ }
+
+ Status ListFileStatus(const std::string& path,
+ std::vector<std::unique_ptr<FileStatus>>*
status_list) const override {
+ PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+ auto self_status = GetFileStatus(path);
+ if (!self_status.ok()) {
+ if (self_status.status().IsNotExist()) {
+ return Status::OK();
+ }
+ return self_status.status();
+ }
+ if (!self_status.value()->IsDir()) {
+ status_list->emplace_back(std::move(self_status).value());
+ return Status::OK();
+ }
+ std::vector<doris::io::FileInfo> files;
+ bool exists = false;
+ doris::Status list_status = resolved.first->list(resolved.second,
false, &files, &exists);
+ if (!list_status.ok()) {
+ return to_paimon_status(list_status);
+ }
+ if (!exists) {
+ return Status::OK();
+ }
+ status_list->reserve(status_list->size() + files.size());
+ for (const auto& file : files) {
+ uint64_t length = file.is_file ?
static_cast<uint64_t>(file.file_size) : 0;
+ status_list->emplace_back(std::make_unique<DorisFileStatus>(
+ join_path(resolved.second, file.file_name), !file.is_file,
length, 0));
+ }
+ return Status::OK();
+ }
+
+ Result<bool> Exists(const std::string& path) const override {
+ ParsedUri uri = parse_uri(path);
+ doris::TFileType::type type = map_scheme_to_file_type(uri.scheme);
+ PAIMON_ASSIGN_OR_RAISE(auto resolved, resolve_path(path));
+ bool exists = false;
+ doris::Status status = resolved.first->exists(resolved.second,
&exists);
+ if (!status.ok()) {
+ return to_paimon_status(status);
+ }
+ if (!exists && type == doris::TFileType::FILE_S3) {
+ std::vector<doris::io::FileInfo> files;
+ bool list_exists = false;
+ doris::Status list_status =
+ resolved.first->list(resolved.second, false, &files,
&list_exists);
+ if (!list_status.ok()) {
+ return to_paimon_status(list_status);
+ }
+ return list_exists || !files.empty();
+ }
+ return exists;
+ }
+
+private:
+ Result<std::pair<doris::io::FileSystemSPtr, std::string>> resolve_path(
+ const std::string& path) const {
+ auto uri = parse_uri(path);
+ doris::TFileType::type type = map_scheme_to_file_type(uri.scheme);
+ std::string normalized_path = normalize_path_for_type(path,
uri.scheme, type);
+ if (type == doris::TFileType::FILE_LOCAL) {
+ doris::io::FileSystemSPtr fs =
doris::io::global_local_filesystem();
+ return std::make_pair(std::move(fs), normalized_path);
+ }
+ std::string fs_key = build_fs_cache_key(type, uri, default_fs_name_);
+ {
+ std::lock_guard lock(fs_lock_);
+ auto it = fs_cache_.find(fs_key);
+ if (it != fs_cache_.end()) {
+ return std::make_pair(it->second, normalized_path);
+ }
+ }
+ doris::io::FSPropertiesRef fs_properties(type);
+ const std::map<std::string, std::string>* properties = &options_;
+ std::map<std::string, std::string> properties_override;
+ if (type == doris::TFileType::FILE_HTTP && !options_.contains("uri") &&
+ !uri.scheme.empty()) {
+ properties_override = options_;
+ properties_override["uri"] = uri.scheme + "://" + uri.authority;
+ properties = &properties_override;
+ }
+ fs_properties.properties = properties;
+ if (!broker_addresses_.empty()) {
+ fs_properties.broker_addresses = &broker_addresses_;
+ }
+ doris::io::FileDescription file_description = {
+ .path = normalized_path, .file_size = -1, .mtime = 0, .fs_name
= default_fs_name_};
+ auto fs_result = doris::FileFactory::create_fs(fs_properties,
file_description);
+ if (!fs_result.has_value()) {
+ return to_paimon_status(fs_result.error());
+ }
+ doris::io::FileSystemSPtr fs = std::move(fs_result).value();
+ {
+ std::lock_guard lock(fs_lock_);
+ fs_cache_.emplace(std::move(fs_key), fs);
+ }
+ return std::make_pair(std::move(fs), std::move(normalized_path));
+ }
+
+ std::map<std::string, std::string> options_;
+ std::vector<doris::TNetworkAddress> broker_addresses_;
+ std::string default_fs_name_;
+ mutable std::mutex fs_lock_;
+ mutable std::unordered_map<std::string, doris::io::FileSystemSPtr>
fs_cache_;
+};
+
+class DorisFileSystemFactory : public FileSystemFactory {
+public:
+ static const char IDENTIFIER[];
+
+ const char* Identifier() const override { return IDENTIFIER; }
+
+ Result<std::unique_ptr<FileSystem>> Create(
+ const std::string& path,
+ const std::map<std::string, std::string>& options) const override {
+ return std::make_unique<DorisFileSystem>(options);
+ }
+};
+
+const char DorisFileSystemFactory::IDENTIFIER[] = "doris";
+
+REGISTER_PAIMON_FACTORY(DorisFileSystemFactory);
+
+} // namespace paimon
+
+namespace doris::vectorized {
+
+void register_paimon_doris_file_system() {}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_doris_file_system.h
b/be/src/vec/exec/format/table/paimon_doris_file_system.h
new file mode 100644
index 00000000000..22552c6eb6f
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_doris_file_system.h
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+namespace doris::vectorized {
+
+// Force-link helper so the paimon-cpp file system factory registration is
kept.
+void register_paimon_doris_file_system();
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/table/paimon_predicate_converter.cpp
b/be/src/vec/exec/format/table/paimon_predicate_converter.cpp
new file mode 100644
index 00000000000..6c8251ddb43
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_predicate_converter.cpp
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/table/paimon_predicate_converter.h"
+
+#include <algorithm>
+#include <cctype>
+#include <utility>
+
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "runtime/decimalv2_value.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "util/timezone_utils.h"
+#include "vec/columns/column_const.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/exprs/vcompound_pred.h"
+#include "vec/exprs/vdirect_in_predicate.h"
+#include "vec/exprs/vectorized_fn_call.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vin_predicate.h"
+#include "vec/exprs/vliteral.h"
+#include "vec/exprs/vslot_ref.h"
+#include "vec/runtime/timestamptz_value.h"
+#include "vec/runtime/vdatetime_value.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+PaimonPredicateConverter::PaimonPredicateConverter(
+ const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState*
state)
+ : _state(state) {
+ _field_index_by_name.reserve(file_slot_descs.size());
+ for (size_t i = 0; i < file_slot_descs.size(); ++i) {
+ const auto& name = file_slot_descs[i]->col_name();
+ auto normalized = _normalize_name(name);
+ if (_field_index_by_name.find(normalized) ==
_field_index_by_name.end()) {
+ _field_index_by_name.emplace(std::move(normalized),
static_cast<int32_t>(i));
+ }
+ }
+
+ if (!TimezoneUtils::find_cctz_time_zone("GMT", _gmt_tz)) {
+ TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone,
_gmt_tz);
+ }
+}
+
+std::shared_ptr<paimon::Predicate> PaimonPredicateConverter::build(
+ const VExprContextSPtrs& conjuncts) {
+ std::vector<std::shared_ptr<paimon::Predicate>> predicates;
+ predicates.reserve(conjuncts.size());
+ for (const auto& conjunct : conjuncts) {
+ if (!conjunct || !conjunct->root()) {
+ continue;
+ }
+ auto root = conjunct->root();
+ if (root->is_rf_wrapper()) {
+ if (auto impl = root->get_impl()) {
+ root = impl;
+ }
+ }
+ auto predicate = _convert_expr(root);
+ if (predicate) {
+ predicates.emplace_back(std::move(predicate));
+ }
+ }
+
+ if (predicates.empty()) {
+ return nullptr;
+ }
+ if (predicates.size() == 1) {
+ return predicates.front();
+ }
+ auto and_result = paimon::PredicateBuilder::And(predicates);
+ if (!and_result.ok()) {
+ return nullptr;
+ }
+ return std::move(and_result).value();
+}
+
+std::shared_ptr<paimon::Predicate>
PaimonPredicateConverter::_convert_expr(const VExprSPtr& expr) {
+ if (!expr) {
+ return nullptr;
+ }
+
+ auto uncast = VExpr::expr_without_cast(expr);
+
+ if (auto* direct_in = dynamic_cast<VDirectInPredicate*>(uncast.get())) {
+ VExprSPtr in_expr;
+ if (direct_in->get_slot_in_expr(in_expr)) {
+ return _convert_in(in_expr);
+ }
+ return nullptr;
+ }
+
+ if (dynamic_cast<VInPredicate*>(uncast.get()) != nullptr) {
+ return _convert_in(uncast);
+ }
+
+ switch (uncast->op()) {
+ case TExprOpcode::COMPOUND_AND:
+ case TExprOpcode::COMPOUND_OR:
+ return _convert_compound(uncast);
+ case TExprOpcode::COMPOUND_NOT:
+ return nullptr;
+ case TExprOpcode::EQ:
+ case TExprOpcode::EQ_FOR_NULL:
+ case TExprOpcode::NE:
+ case TExprOpcode::GE:
+ case TExprOpcode::GT:
+ case TExprOpcode::LE:
+ case TExprOpcode::LT:
+ return _convert_binary(uncast);
+ default:
+ break;
+ }
+
+ if (auto* fn = dynamic_cast<VectorizedFnCall*>(uncast.get())) {
+ auto fn_name = _normalize_name(fn->function_name());
+ if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+ return _convert_is_null(uncast, fn_name);
+ }
+ if (fn_name == "like") {
+ return _convert_like_prefix(uncast);
+ }
+ }
+
+ return nullptr;
+}
+
+std::shared_ptr<paimon::Predicate> PaimonPredicateConverter::_convert_compound(
+ const VExprSPtr& expr) {
+ if (!expr || expr->get_num_children() != 2) {
+ return nullptr;
+ }
+ auto left = _convert_expr(expr->get_child(0));
+ if (!left) {
+ return nullptr;
+ }
+ auto right = _convert_expr(expr->get_child(1));
+ if (!right) {
+ return nullptr;
+ }
+
+ if (expr->op() == TExprOpcode::COMPOUND_AND) {
+ auto and_result = paimon::PredicateBuilder::And({left, right});
+ return and_result.ok() ? std::move(and_result).value() : nullptr;
+ }
+ if (expr->op() == TExprOpcode::COMPOUND_OR) {
+ auto or_result = paimon::PredicateBuilder::Or({left, right});
+ return or_result.ok() ? std::move(or_result).value() : nullptr;
+ }
+ return nullptr;
+}
+
+std::shared_ptr<paimon::Predicate> PaimonPredicateConverter::_convert_in(const
VExprSPtr& expr) {
+ auto* in_pred = dynamic_cast<VInPredicate*>(expr.get());
+ if (!in_pred || expr->get_num_children() < 2) {
+ return nullptr;
+ }
+ auto field_meta = _resolve_field(expr->get_child(0));
+ if (!field_meta) {
+ return nullptr;
+ }
+
+ std::vector<paimon::Literal> literals;
+ literals.reserve(expr->get_num_children() - 1);
+ for (uint16_t i = 1; i < expr->get_num_children(); ++i) {
+ auto literal = _convert_literal(expr->get_child(i),
*field_meta->slot_desc,
+ field_meta->field_type);
+ if (!literal) {
+ return nullptr;
+ }
+ literals.emplace_back(std::move(*literal));
+ }
+
+ if (literals.empty()) {
+ return nullptr;
+ }
+ if (in_pred->is_not_in()) {
+ return paimon::PredicateBuilder::NotIn(field_meta->index,
field_meta->slot_desc->col_name(),
+ field_meta->field_type,
literals);
+ }
+ return paimon::PredicateBuilder::In(field_meta->index,
field_meta->slot_desc->col_name(),
+ field_meta->field_type, literals);
+}
+
+std::shared_ptr<paimon::Predicate> PaimonPredicateConverter::_convert_binary(
+ const VExprSPtr& expr) {
+ if (!expr || expr->get_num_children() != 2) {
+ return nullptr;
+ }
+ auto field_meta = _resolve_field(expr->get_child(0));
+ if (!field_meta) {
+ return nullptr;
+ }
+
+ if (expr->op() == TExprOpcode::EQ_FOR_NULL) {
+ return paimon::PredicateBuilder::IsNull(
+ field_meta->index, field_meta->slot_desc->col_name(),
field_meta->field_type);
+ }
+
+ auto literal =
+ _convert_literal(expr->get_child(1), *field_meta->slot_desc,
field_meta->field_type);
+ if (!literal) {
+ return nullptr;
+ }
+
+ switch (expr->op()) {
+ case TExprOpcode::EQ:
+ return paimon::PredicateBuilder::Equal(field_meta->index,
field_meta->slot_desc->col_name(),
+ field_meta->field_type,
*literal);
+ case TExprOpcode::NE:
+ return paimon::PredicateBuilder::NotEqual(field_meta->index,
+
field_meta->slot_desc->col_name(),
+ field_meta->field_type,
*literal);
+ case TExprOpcode::GE:
+ return paimon::PredicateBuilder::GreaterOrEqual(field_meta->index,
+
field_meta->slot_desc->col_name(),
+
field_meta->field_type, *literal);
+ case TExprOpcode::GT:
+ return paimon::PredicateBuilder::GreaterThan(field_meta->index,
+
field_meta->slot_desc->col_name(),
+ field_meta->field_type,
*literal);
+ case TExprOpcode::LE:
+ return paimon::PredicateBuilder::LessOrEqual(field_meta->index,
+
field_meta->slot_desc->col_name(),
+ field_meta->field_type,
*literal);
+ case TExprOpcode::LT:
+ return paimon::PredicateBuilder::LessThan(field_meta->index,
+
field_meta->slot_desc->col_name(),
+ field_meta->field_type,
*literal);
+ default:
+ break;
+ }
+ return nullptr;
+}
+
+std::shared_ptr<paimon::Predicate> PaimonPredicateConverter::_convert_is_null(
+ const VExprSPtr& expr, const std::string& fn_name) {
+ if (!expr || expr->get_num_children() != 1) {
+ return nullptr;
+ }
+ auto field_meta = _resolve_field(expr->get_child(0));
+ if (!field_meta) {
+ return nullptr;
+ }
+ if (fn_name == "is_not_null_pred") {
+ return paimon::PredicateBuilder::IsNotNull(
+ field_meta->index, field_meta->slot_desc->col_name(),
field_meta->field_type);
+ }
+ return paimon::PredicateBuilder::IsNull(field_meta->index,
field_meta->slot_desc->col_name(),
+ field_meta->field_type);
+}
+
+std::shared_ptr<paimon::Predicate>
PaimonPredicateConverter::_convert_like_prefix(
+ const VExprSPtr& expr) {
+ if (!expr || expr->get_num_children() != 2) {
+ return nullptr;
+ }
+ auto field_meta = _resolve_field(expr->get_child(0));
+ if (!field_meta || field_meta->field_type != paimon::FieldType::STRING) {
+ return nullptr;
+ }
+
+ auto pattern_opt = _extract_string_literal(expr->get_child(1));
+ if (!pattern_opt) {
+ return nullptr;
+ }
+ const std::string& pattern = *pattern_opt;
+ if (!pattern.empty() && pattern.front() == '%') {
+ return nullptr;
+ }
+ if (pattern.empty() || pattern.back() != '%') {
+ return nullptr;
+ }
+
+ std::string prefix = pattern.substr(0, pattern.size() - 1);
+ paimon::Literal lower_literal(paimon::FieldType::STRING, prefix.data(),
prefix.size());
+ auto lower_pred = paimon::PredicateBuilder::GreaterOrEqual(
+ field_meta->index, field_meta->slot_desc->col_name(),
field_meta->field_type,
+ lower_literal);
+
+ auto upper_prefix = _next_prefix(prefix);
+ if (!upper_prefix) {
+ return lower_pred;
+ }
+
+ paimon::Literal upper_literal(paimon::FieldType::STRING,
upper_prefix->data(),
+ upper_prefix->size());
+ auto upper_pred =
+ paimon::PredicateBuilder::LessThan(field_meta->index,
field_meta->slot_desc->col_name(),
+ field_meta->field_type,
upper_literal);
+ auto and_result = paimon::PredicateBuilder::And({lower_pred, upper_pred});
+ return and_result.ok() ? std::move(and_result).value() : nullptr;
+}
+
+std::optional<PaimonPredicateConverter::FieldMeta>
PaimonPredicateConverter::_resolve_field(
+ const VExprSPtr& expr) const {
+ if (!_state || !expr) {
+ return std::nullopt;
+ }
+ auto slot_expr = VExpr::expr_without_cast(expr);
+ auto* slot_ref = dynamic_cast<VSlotRef*>(slot_expr.get());
+ if (!slot_ref) {
+ return std::nullopt;
+ }
+ auto* slot_desc =
_state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
+ if (!slot_desc) {
+ return std::nullopt;
+ }
+ auto normalized = _normalize_name(slot_desc->col_name());
+ auto it = _field_index_by_name.find(normalized);
+ if (it == _field_index_by_name.end()) {
+ return std::nullopt;
+ }
+ auto slot_type = slot_desc->type();
+ auto field_type =
+ _to_paimon_field_type(slot_type->get_primitive_type(),
slot_type->get_precision());
+ if (!field_type) {
+ return std::nullopt;
+ }
+ return FieldMeta {it->second, *field_type, slot_desc};
+}
+
+std::optional<paimon::Literal> PaimonPredicateConverter::_convert_literal(
+ const VExprSPtr& expr, const SlotDescriptor& slot_desc,
+ paimon::FieldType field_type) const {
+ auto literal_expr = VExpr::expr_without_cast(expr);
+ auto* literal = dynamic_cast<VLiteral*>(literal_expr.get());
+ if (!literal) {
+ return std::nullopt;
+ }
+
+ auto literal_type = remove_nullable(literal->get_data_type());
+ PrimitiveType literal_primitive = literal_type->get_primitive_type();
+ PrimitiveType slot_primitive = slot_desc.type()->get_primitive_type();
+
+ ColumnPtr col =
literal->get_column_ptr()->convert_to_full_column_if_const();
+ if (const auto* nullable = check_and_get_column<ColumnNullable>(*col)) {
+ if (nullable->is_null_at(0)) {
+ return std::nullopt;
+ }
+ col = nullable->get_nested_column_ptr();
+ }
+
+ Field field;
+ col->get(0, field);
+
+ switch (slot_primitive) {
+ case TYPE_BOOLEAN: {
+ if (literal_primitive != TYPE_BOOLEAN) {
+ return std::nullopt;
+ }
+ return paimon::Literal(static_cast<bool>(field.get<TYPE_BOOLEAN>()));
+ }
+ case TYPE_TINYINT:
+ case TYPE_SMALLINT:
+ case TYPE_INT:
+ case TYPE_BIGINT: {
+ if (!_is_integer_type(literal_primitive)) {
+ return std::nullopt;
+ }
+ int64_t value = 0;
+ switch (literal_primitive) {
+ case TYPE_TINYINT:
+ value = field.get<TYPE_TINYINT>();
+ break;
+ case TYPE_SMALLINT:
+ value = field.get<TYPE_SMALLINT>();
+ break;
+ case TYPE_INT:
+ value = field.get<TYPE_INT>();
+ break;
+ case TYPE_BIGINT:
+ value = field.get<TYPE_BIGINT>();
+ break;
+ default:
+ return std::nullopt;
+ }
+ if (slot_primitive == TYPE_TINYINT) {
+ return paimon::Literal(static_cast<int8_t>(value));
+ }
+ if (slot_primitive == TYPE_SMALLINT) {
+ return paimon::Literal(static_cast<int16_t>(value));
+ }
+ if (slot_primitive == TYPE_INT) {
+ return paimon::Literal(static_cast<int32_t>(value));
+ }
+ return paimon::Literal(static_cast<int64_t>(value));
+ }
+ case TYPE_DOUBLE: {
+ if (literal_primitive != TYPE_DOUBLE && literal_primitive !=
TYPE_FLOAT) {
+ return std::nullopt;
+ }
+ double value = 0;
+ if (literal_primitive == TYPE_FLOAT) {
+ value = static_cast<double>(field.get<TYPE_FLOAT>());
+ } else {
+ value = field.get<TYPE_DOUBLE>();
+ }
+ return paimon::Literal(value);
+ }
+ case TYPE_DATE:
+ case TYPE_DATEV2: {
+ if (!_is_date_type(literal_primitive)) {
+ return std::nullopt;
+ }
+ int64_t seconds = 0;
+ if (literal_primitive == TYPE_DATE) {
+ const auto& dt = field.get<TYPE_DATE>();
+ if (!dt.is_valid_date()) {
+ return std::nullopt;
+ }
+ dt.unix_timestamp(&seconds, _gmt_tz);
+ } else if (literal_primitive == TYPE_DATEV2) {
+ const auto& dt = field.get<TYPE_DATEV2>();
+ if (!dt.is_valid_date()) {
+ return std::nullopt;
+ }
+ dt.unix_timestamp(&seconds, _gmt_tz);
+ }
+ int32_t days = _seconds_to_days(seconds);
+ return paimon::Literal(paimon::FieldType::DATE, days);
+ }
+ case TYPE_DATETIME:
+ case TYPE_DATETIMEV2: {
+ if (!_is_datetime_type(literal_primitive)) {
+ return std::nullopt;
+ }
+ if (literal_primitive == TYPE_DATETIME) {
+ const auto& dt = field.get<TYPE_DATETIME>();
+ if (!dt.is_valid_date()) {
+ return std::nullopt;
+ }
+ int64_t seconds = 0;
+ dt.unix_timestamp(&seconds, _gmt_tz);
+ return paimon::Literal(paimon::Timestamp::FromEpochMillis(seconds
* 1000));
+ }
+ std::pair<int64_t, int64_t> ts;
+ const auto& dt = field.get<TYPE_DATETIMEV2>();
+ if (!dt.is_valid_date()) {
+ return std::nullopt;
+ }
+ dt.unix_timestamp(&ts, _gmt_tz);
+ int64_t millis = ts.first * 1000 + ts.second / 1000;
+ return paimon::Literal(paimon::Timestamp::FromEpochMillis(millis));
+ }
+ case TYPE_VARCHAR:
+ case TYPE_STRING: {
+ if (!_is_string_type(literal_primitive)) {
+ return std::nullopt;
+ }
+ const auto& value = field.get<TYPE_STRING>();
+ return paimon::Literal(field_type, value.data(), value.size());
+ }
+ case TYPE_DECIMALV2:
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I:
+ case TYPE_DECIMAL256: {
+ if (!_is_decimal_type(literal_primitive)) {
+ return std::nullopt;
+ }
+ int32_t precision =
static_cast<int32_t>(literal_type->get_precision());
+ int32_t scale = static_cast<int32_t>(literal_type->get_scale());
+ if (precision <= 0 || precision > paimon::Decimal::MAX_PRECISION) {
+ return std::nullopt;
+ }
+
+ paimon::Decimal::int128_t value = 0;
+ switch (literal_primitive) {
+ case TYPE_DECIMALV2: {
+ const auto& dec = field.get<TYPE_DECIMALV2>();
+ value = dec.value();
+ break;
+ }
+ case TYPE_DECIMAL32: {
+ const auto& dec = field.get<TYPE_DECIMAL32>();
+ value = dec.value;
+ break;
+ }
+ case TYPE_DECIMAL64: {
+ const auto& dec = field.get<TYPE_DECIMAL64>();
+ value = dec.value;
+ break;
+ }
+ case TYPE_DECIMAL128I: {
+ const auto& dec = field.get<TYPE_DECIMAL128I>();
+ value = dec.value;
+ break;
+ }
+ default:
+ return std::nullopt;
+ }
+ return paimon::Literal(paimon::Decimal(precision, scale, value));
+ }
+ default:
+ break;
+ }
+ return std::nullopt;
+}
+
+std::optional<std::string> PaimonPredicateConverter::_extract_string_literal(
+ const VExprSPtr& expr) const {
+ auto literal_expr = VExpr::expr_without_cast(expr);
+ auto* literal = dynamic_cast<VLiteral*>(literal_expr.get());
+ if (!literal) {
+ return std::nullopt;
+ }
+ auto literal_type = remove_nullable(literal->get_data_type());
+ PrimitiveType literal_primitive = literal_type->get_primitive_type();
+ if (!_is_string_type(literal_primitive)) {
+ return std::nullopt;
+ }
+
+ ColumnPtr col =
literal->get_column_ptr()->convert_to_full_column_if_const();
+ if (const auto* nullable = check_and_get_column<ColumnNullable>(*col)) {
+ if (nullable->is_null_at(0)) {
+ return std::nullopt;
+ }
+ col = nullable->get_nested_column_ptr();
+ }
+ Field field;
+ col->get(0, field);
+ const auto& value = field.get<TYPE_STRING>();
+ return value;
+}
+
+std::string PaimonPredicateConverter::_normalize_name(std::string_view name) {
+ std::string out(name);
+ std::transform(out.begin(), out.end(), out.begin(),
+ [](unsigned char c) { return
static_cast<char>(std::tolower(c)); });
+ return out;
+}
+
+std::optional<std::string> PaimonPredicateConverter::_next_prefix(const
std::string& prefix) {
+ if (prefix.empty()) {
+ return std::nullopt;
+ }
+ std::string upper = prefix;
+ for (int i = static_cast<int>(upper.size()) - 1; i >= 0; --i) {
+ auto c = static_cast<unsigned char>(upper[i]);
+ if (c != 0xFF) {
+ upper[i] = static_cast<char>(c + 1);
+ upper.resize(i + 1);
+ return upper;
+ }
+ }
+ return std::nullopt;
+}
+
+int32_t PaimonPredicateConverter::_seconds_to_days(int64_t seconds) {
+ static constexpr int64_t kSecondsPerDay = 24 * 60 * 60;
+ int64_t days = seconds / kSecondsPerDay;
+ if (seconds < 0 && seconds % kSecondsPerDay != 0) {
+ --days;
+ }
+ return static_cast<int32_t>(days);
+}
+
+bool PaimonPredicateConverter::_is_integer_type(PrimitiveType type) {
+ switch (type) {
+ case TYPE_TINYINT:
+ case TYPE_SMALLINT:
+ case TYPE_INT:
+ case TYPE_BIGINT:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool PaimonPredicateConverter::_is_string_type(PrimitiveType type) {
+ return type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_STRING;
+}
+
+bool PaimonPredicateConverter::_is_decimal_type(PrimitiveType type) {
+ switch (type) {
+ case TYPE_DECIMALV2:
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I:
+ case TYPE_DECIMAL256:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool PaimonPredicateConverter::_is_date_type(PrimitiveType type) {
+ return type == TYPE_DATE || type == TYPE_DATEV2;
+}
+
+bool PaimonPredicateConverter::_is_datetime_type(PrimitiveType type) {
+ return type == TYPE_DATETIME || type == TYPE_DATETIMEV2;
+}
+
+std::optional<paimon::FieldType>
PaimonPredicateConverter::_to_paimon_field_type(
+ PrimitiveType type, uint32_t precision) {
+ switch (type) {
+ case TYPE_BOOLEAN:
+ return paimon::FieldType::BOOLEAN;
+ case TYPE_TINYINT:
+ return paimon::FieldType::TINYINT;
+ case TYPE_SMALLINT:
+ return paimon::FieldType::SMALLINT;
+ case TYPE_INT:
+ return paimon::FieldType::INT;
+ case TYPE_BIGINT:
+ return paimon::FieldType::BIGINT;
+ case TYPE_DOUBLE:
+ return paimon::FieldType::DOUBLE;
+ case TYPE_VARCHAR:
+ case TYPE_STRING:
+ return paimon::FieldType::STRING;
+ case TYPE_DATE:
+ case TYPE_DATEV2:
+ return paimon::FieldType::DATE;
+ case TYPE_DATETIME:
+ case TYPE_DATETIMEV2:
+ return paimon::FieldType::TIMESTAMP;
+ case TYPE_DECIMALV2:
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I:
+ case TYPE_DECIMAL256:
+ if (precision > 0 && precision > paimon::Decimal::MAX_PRECISION) {
+ return std::nullopt;
+ }
+ return paimon::FieldType::DECIMAL;
+ case TYPE_FLOAT:
+ case TYPE_CHAR:
+ default:
+ return std::nullopt;
+ }
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_predicate_converter.h
b/be/src/vec/exec/format/table/paimon_predicate_converter.h
new file mode 100644
index 00000000000..a844b497d52
--- /dev/null
+++ b/be/src/vec/exec/format/table/paimon_predicate_converter.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "cctz/time_zone.h"
+#include "paimon/defs.h"
+#include "paimon/predicate/literal.h"
+#include "runtime/define_primitive_type.h"
+#include "vec/exprs/vexpr_fwd.h"
+
+namespace paimon {
+class Predicate;
+} // namespace paimon
+
+namespace doris {
+class RuntimeState;
+class SlotDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+class PaimonPredicateConverter {
+public:
+ PaimonPredicateConverter(const std::vector<SlotDescriptor*>&
file_slot_descs,
+ RuntimeState* state);
+
+ std::shared_ptr<paimon::Predicate> build(const VExprContextSPtrs&
conjuncts);
+
+private:
+ struct FieldMeta {
+ int32_t index = -1;
+ paimon::FieldType field_type = paimon::FieldType::UNKNOWN;
+ const SlotDescriptor* slot_desc = nullptr;
+ };
+
+ std::shared_ptr<paimon::Predicate> _convert_expr(const VExprSPtr& expr);
+ std::shared_ptr<paimon::Predicate> _convert_compound(const VExprSPtr&
expr);
+ std::shared_ptr<paimon::Predicate> _convert_in(const VExprSPtr& expr);
+ std::shared_ptr<paimon::Predicate> _convert_binary(const VExprSPtr& expr);
+ std::shared_ptr<paimon::Predicate> _convert_is_null(const VExprSPtr& expr,
+ const std::string&
fn_name);
+ std::shared_ptr<paimon::Predicate> _convert_like_prefix(const VExprSPtr&
expr);
+
+ std::optional<FieldMeta> _resolve_field(const VExprSPtr& expr) const;
+ std::optional<paimon::Literal> _convert_literal(const VExprSPtr& expr,
+ const SlotDescriptor&
slot_desc,
+ paimon::FieldType
field_type) const;
+ std::optional<std::string> _extract_string_literal(const VExprSPtr& expr)
const;
+
+ static std::string _normalize_name(std::string_view name);
+ static std::optional<std::string> _next_prefix(const std::string& prefix);
+ static int32_t _seconds_to_days(int64_t seconds);
+ static bool _is_integer_type(PrimitiveType type);
+ static bool _is_string_type(PrimitiveType type);
+ static bool _is_decimal_type(PrimitiveType type);
+ static bool _is_date_type(PrimitiveType type);
+ static bool _is_datetime_type(PrimitiveType type);
+ static std::optional<paimon::FieldType>
_to_paimon_field_type(PrimitiveType type,
+ uint32_t
precision);
+
+ std::unordered_map<std::string, int32_t> _field_index_by_name;
+ RuntimeState* _state = nullptr;
+ cctz::time_zone _gmt_tz;
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/file_scanner.cpp
b/be/src/vec/exec/scan/file_scanner.cpp
index 17170df68f7..97c622cedc2 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -67,7 +67,9 @@
#include "vec/exec/format/table/iceberg_reader.h"
#include "vec/exec/format/table/lakesoul_jni_reader.h"
#include "vec/exec/format/table/max_compute_jni_reader.h"
+#include "vec/exec/format/table/paimon_cpp_reader.h"
#include "vec/exec/format/table/paimon_jni_reader.h"
+#include "vec/exec/format/table/paimon_predicate_converter.h"
#include "vec/exec/format/table/paimon_reader.h"
#include "vec/exec/format/table/remote_doris_reader.h"
#include "vec/exec/format/table/transactional_hive_reader.h"
@@ -989,9 +991,25 @@ Status FileScanner::_get_next_reader() {
_cur_reader = std::move(mc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type ==
"paimon") {
- _cur_reader = PaimonJniReader::create_unique(_file_slot_descs,
_state, _profile,
- range, _params);
- init_status =
((PaimonJniReader*)(_cur_reader.get()))->init_reader();
+ if (_state->query_options().__isset.enable_paimon_cpp_reader &&
+ _state->query_options().enable_paimon_cpp_reader) {
+ auto cpp_reader =
PaimonCppReader::create_unique(_file_slot_descs, _state,
+ _profile,
range, _params);
+
cpp_reader->set_push_down_agg_type(_get_push_down_agg_type());
+ if (!_is_load && !_push_down_conjuncts.empty()) {
+ PaimonPredicateConverter
predicate_converter(_file_slot_descs, _state);
+ auto predicate =
predicate_converter.build(_push_down_conjuncts);
+ if (predicate) {
+ cpp_reader->set_predicate(std::move(predicate));
+ }
+ }
+ init_status = cpp_reader->init_reader();
+ _cur_reader = std::move(cpp_reader);
+ } else {
+ _cur_reader =
PaimonJniReader::create_unique(_file_slot_descs, _state, _profile,
+ range,
_params);
+ init_status =
((PaimonJniReader*)(_cur_reader.get()))->init_reader();
+ }
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
_cur_reader = HudiJniReader::create_unique(*_params,
@@ -1012,8 +1030,9 @@ Status FileScanner::_get_next_reader() {
}
// Set col_name_to_block_idx for JNI readers to avoid repeated map
creation
if (_cur_reader) {
- static_cast<JniReader*>(_cur_reader.get())
- ->set_col_name_to_block_idx(&_src_block_name_to_idx);
+ if (auto* jni_reader =
dynamic_cast<JniReader*>(_cur_reader.get())) {
+
jni_reader->set_col_name_to_block_idx(&_src_block_name_to_idx);
+ }
}
break;
}
diff --git a/be/test/vec/exec/format/table/paimon_cpp_reader_test.cpp
b/be/test/vec/exec/format/table/paimon_cpp_reader_test.cpp
new file mode 100644
index 00000000000..95fb22d9b65
--- /dev/null
+++ b/be/test/vec/exec/format/table/paimon_cpp_reader_test.cpp
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/table/paimon_cpp_reader.h"
+
+#include <gtest/gtest.h>
+
+#include <string>
+#include <vector>
+
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+
+namespace doris::vectorized {
+
+class PaimonCppReaderTest : public testing::Test {
+protected:
+ void SetUp() override {
+ _query_options.__set_batch_size(3);
+ _runtime_state = std::make_unique<RuntimeState>(_query_options,
_query_globals);
+ }
+
+ TFileRangeDesc _build_range_with_table_level_row_count(int64_t row_count) {
+ TFileRangeDesc range;
+ range.__isset.table_format_params = true;
+ range.table_format_params.__isset.table_level_row_count = true;
+ range.table_format_params.table_level_row_count = row_count;
+ return range;
+ }
+
+ TQueryOptions _query_options;
+ TQueryGlobals _query_globals;
+ std::unique_ptr<RuntimeState> _runtime_state;
+ RuntimeProfile _profile {"paimon_cpp_reader_test"};
+ std::vector<SlotDescriptor*> _file_slot_descs;
+};
+
+TEST_F(PaimonCppReaderTest, CountPushDownUsesTableLevelRowCount) {
+ auto range = _build_range_with_table_level_row_count(5);
+ PaimonCppReader reader(_file_slot_descs, _runtime_state.get(), &_profile,
range, nullptr);
+ reader.set_push_down_agg_type(TPushAggOp::type::COUNT);
+
+ auto init_status = reader.init_reader();
+ ASSERT_TRUE(init_status.ok()) << init_status;
+
+ Block block;
+ size_t read_rows = 0;
+ bool eof = false;
+
+ auto first_status = reader.get_next_block(&block, &read_rows, &eof);
+ ASSERT_TRUE(first_status.ok()) << first_status;
+ EXPECT_EQ(3, read_rows);
+ EXPECT_FALSE(eof);
+
+ auto second_status = reader.get_next_block(&block, &read_rows, &eof);
+ ASSERT_TRUE(second_status.ok()) << second_status;
+ EXPECT_EQ(2, read_rows);
+ EXPECT_TRUE(eof);
+
+ auto third_status = reader.get_next_block(&block, &read_rows, &eof);
+ ASSERT_TRUE(third_status.ok()) << third_status;
+ EXPECT_EQ(0, read_rows);
+ EXPECT_TRUE(eof);
+}
+
+TEST_F(PaimonCppReaderTest, InitReaderFailsWithoutPaimonSplit) {
+ TFileRangeDesc range;
+ range.__isset.table_format_params = true;
+ range.table_format_params.__isset.paimon_params = true;
+ range.table_format_params.paimon_params.__isset.paimon_table = true;
+ range.table_format_params.paimon_params.paimon_table =
"s3://bucket/db.tbl";
+
+ PaimonCppReader reader(_file_slot_descs, _runtime_state.get(), &_profile,
range, nullptr);
+ auto status = reader.init_reader();
+
+ ASSERT_FALSE(status.ok());
+ EXPECT_NE(status.to_string().find("missing paimon_split"),
std::string::npos);
+}
+
+} // namespace doris::vectorized
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java
index 44ffb298c98..d1d6c2f9bdb 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonUtils.java
@@ -26,7 +26,8 @@ import java.util.List;
import java.util.stream.Collectors;
public class PaimonUtils {
- private static final Base64.Decoder DECODER = Base64.getUrlDecoder();
+ private static final Base64.Decoder URL_DECODER = Base64.getUrlDecoder();
+ private static final Base64.Decoder STD_DECODER = Base64.getDecoder();
public static List<String> getFieldNames(RowType rowType) {
return rowType.getFields().stream()
@@ -37,9 +38,14 @@ public class PaimonUtils {
public static <T> T deserialize(String encodedStr) {
try {
- return InstantiationUtil.deserializeObject(
-
DECODER.decode(encodedStr.getBytes(java.nio.charset.StandardCharsets.UTF_8)),
- PaimonUtils.class.getClassLoader());
+ byte[] decoded;
+ try {
+ decoded =
URL_DECODER.decode(encodedStr.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ } catch (IllegalArgumentException e) {
+ // Fallback to standard Base64 for splits encoded by native
Paimon serialization.
+ decoded =
STD_DECODER.decode(encodedStr.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ }
+ return InstantiationUtil.deserializeObject(decoded,
PaimonUtils.class.getClassLoader());
} catch (Throwable e) {
throw new RuntimeException(e);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
index 4c343bd1266..d4c9ddf6c2f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
@@ -51,6 +51,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.predicate.Predicate;
@@ -60,6 +61,7 @@ import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.types.ArrayType;
@@ -78,6 +80,7 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.DateTimeException;
@@ -509,6 +512,23 @@ public class PaimonUtil {
}
}
+ /**
+ * Serialize DataSplit using Paimon's native binary format.
+ * This format is compatible with paimon-cpp reader.
+ * Uses standard Base64 encoding (not URL-safe) for BE compatibility.
+ */
+ public static String encodeDataSplitToString(DataSplit split) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper out = new
DataOutputViewStreamWrapper(baos);
+ split.serialize(out);
+ byte[] bytes = baos.toByteArray();
+ return Base64.getEncoder().encodeToString(bytes);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to serialize DataSplit using
Paimon native format", e);
+ }
+ }
+
public static Map<String, String> getPartitionInfoMap(Table table,
BinaryRow partitionValues, String timeZone) {
Map<String, String> partitionInfoMap = new HashMap<>();
List<String> partitionKeys = table.partitionKeys();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index f3f71fcfcc9..6323972dd16 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -228,9 +228,19 @@ public class PaimonScanNode extends FileQueryScanNode {
String fileFormat = getFileFormat(paimonSplit.getPathString());
if (split != null) {
- // use jni reader
+ // use jni reader or paimon-cpp reader
rangeDesc.setFormatType(TFileFormatType.FORMAT_JNI);
- fileDesc.setPaimonSplit(PaimonUtil.encodeObjectToString(split));
+ // Use Paimon native serialization for paimon-cpp reader
+ if (sessionVariable.isEnablePaimonCppReader() && split instanceof
DataSplit) {
+
fileDesc.setPaimonSplit(PaimonUtil.encodeDataSplitToString((DataSplit) split));
+ } else {
+
fileDesc.setPaimonSplit(PaimonUtil.encodeObjectToString(split));
+ }
+ // Set table location for paimon-cpp reader
+ String tableLocation = source.getTableLocation();
+ if (tableLocation != null) {
+ fileDesc.setPaimonTable(tableLocation);
+ }
rangeDesc.setSelfSplitWeight(paimonSplit.getSelfSplitWeight());
} else {
// use native reader
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
index 2dc87664f69..43c6ef41701 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
@@ -29,6 +29,7 @@ import
org.apache.doris.datasource.paimon.PaimonSysExternalTable;
import org.apache.doris.thrift.TFileAttributes;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import java.util.Optional;
@@ -90,4 +91,12 @@ public class PaimonSource {
public String getFileFormatFromTableProperties() {
return originTable.options().getOrDefault("file.format", "parquet");
}
+
+ public String getTableLocation() {
+ if (originTable instanceof FileStoreTable) {
+ return ((FileStoreTable) originTable).location().toString();
+ }
+ // Fallback to path option
+ return originTable.options().get("path");
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 7aa82a3e0ee..3fc7d3a7634 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -740,6 +740,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
+ public static final String ENABLE_PAIMON_CPP_READER =
"enable_paimon_cpp_reader";
+
public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE =
"enable_count_push_down_for_external_table";
public static final String FETCH_ALL_FE_FOR_SYSTEM_TABLE =
"fetch_all_fe_for_system_table";
@@ -1314,13 +1316,14 @@ public class SessionVariable implements Serializable,
Writable {
public enum IgnoreSplitType {
NONE,
IGNORE_JNI,
- IGNORE_NATIVE
+ IGNORE_NATIVE,
+ IGNORE_PAIMON_CPP
}
public static final String IGNORE_SPLIT_TYPE = "ignore_split_type";
@VariableMgr.VarAttr(name = IGNORE_SPLIT_TYPE,
checker = "checkIgnoreSplitType",
- options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE"},
+ options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE",
"IGNORE_PAIMON_CPP"},
description = {"忽略指定类型的 split", "Ignore splits of the specified
type"})
public String ignoreSplitType = IgnoreSplitType.NONE.toString();
@@ -2766,6 +2769,11 @@ public class SessionVariable implements Serializable,
Writable {
description = {"强制使用 jni 方式读取外表", "Force the use of jni mode to
read external table"})
private boolean forceJniScanner = false;
+ @VariableMgr.VarAttr(name = ENABLE_PAIMON_CPP_READER,
+ fuzzy = true,
+ description = {"Paimon 非原生文件读取使用 paimon-cpp", "Use paimon-cpp for
non-native Paimon reads"})
+ private boolean enablePaimonCppReader = false;
+
@VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE,
fuzzy = true,
description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown
optimization for external table"})
@@ -3620,6 +3628,7 @@ public class SessionVariable implements Serializable,
Writable {
// jni
this.forceJniScanner = random.nextBoolean();
+ this.enablePaimonCppReader = random.nextBoolean();
// statistics
this.fetchHiveRowCountSync = random.nextBoolean();
@@ -5183,6 +5192,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableParquetFilePageCache(enableParquetFilePageCache);
tResult.setEnableOrcFilterByMinMax(enableOrcFilterByMinMax);
+ tResult.setEnablePaimonCppReader(enablePaimonCppReader);
tResult.setCheckOrcInitSargsSuccess(checkOrcInitSargsSuccess);
tResult.setTruncateCharOrVarcharColumns(truncateCharOrVarcharColumns);
@@ -5923,6 +5933,10 @@ public class SessionVariable implements Serializable,
Writable {
return forceJniScanner;
}
+ public boolean isEnablePaimonCppReader() {
+ return enablePaimonCppReader;
+ }
+
public String getIgnoreSplitType() {
return ignoreSplitType;
}
@@ -5931,7 +5945,8 @@ public class SessionVariable implements Serializable,
Writable {
try {
IgnoreSplitType.valueOf(value);
} catch (Exception e) {
- throw new UnsupportedOperationException("We only support `NONE`,
`IGNORE_JNI` and `IGNORE_NATIVE`");
+ throw new UnsupportedOperationException(
+ "We only support `NONE`, `IGNORE_JNI`, `IGNORE_NATIVE` and
`IGNORE_PAIMON_CPP`");
}
}
@@ -5943,6 +5958,10 @@ public class SessionVariable implements Serializable,
Writable {
forceJniScanner = force;
}
+ public void setEnablePaimonCppReader(boolean enable) {
+ enablePaimonCppReader = enable;
+ }
+
public boolean isEnableCountPushDownForExternalTable() {
return enableCountPushDownForExternalTable;
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index a1b9e237c6c..e495d529ef6 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -437,6 +437,8 @@ struct TQueryOptions {
195: optional bool enable_left_semi_direct_return_opt;
200: optional bool enable_adjust_conjunct_order_by_cost;
+ // Use paimon-cpp to read Paimon splits on BE
+ 201: optional bool enable_paimon_cpp_reader = false;
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]