This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new dc57135 feat(core): introduce read context (#63)
dc57135 is described below
commit dc57135795c9773e6e0a7b934022a05a7c1c9386
Author: Zhang Jiawei <[email protected]>
AuthorDate: Wed Jun 10 11:13:37 2026 +0800
feat(core): introduce read context (#63)
---
include/paimon/read_context.h | 353 +++++++++++++++++++++
src/paimon/core/operation/abstract_split_read.cpp | 263 +++++++++++++++
src/paimon/core/operation/abstract_split_read.h | 136 ++++++++
.../core/operation/abstract_split_read_test.cpp | 161 ++++++++++
.../core/operation/internal_read_context.cpp | 121 +++++++
src/paimon/core/operation/internal_read_context.h | 115 +++++++
.../core/operation/internal_read_context_test.cpp | 196 ++++++++++++
src/paimon/core/operation/read_context.cpp | 256 +++++++++++++++
src/paimon/core/operation/read_context_test.cpp | 126 ++++++++
src/paimon/core/operation/split_read.h | 44 +++
10 files changed, 1771 insertions(+)
diff --git a/include/paimon/read_context.h b/include/paimon/read_context.h
new file mode 100644
index 0000000..e4d79b2
--- /dev/null
+++ b/include/paimon/read_context.h
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "paimon/predicate/predicate.h"
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+#include "paimon/utils/read_ahead_cache.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Executor;
+class MemoryPool;
+class Predicate;
+class FileSystem;
+
+/// `ReadContext` is some configuration for read operations.
+///
+/// Please do not use this class directly, use `ReadContextBuilder` to build a
`ReadContext` which
+/// has input validation.
+/// @see ReadContextBuilder
+class PAIMON_EXPORT ReadContext {
+ public:
+ ReadContext(const std::string& path, const std::string& branch,
+ const std::vector<std::string>& read_schema,
+ const std::vector<int32_t>& read_field_ids,
+ const std::shared_ptr<Predicate>& predicate, bool
enable_predicate_filter,
+ bool enable_prefetch, uint32_t prefetch_batch_count,
+ uint32_t prefetch_max_parallel_num, bool
enable_multi_thread_row_to_batch,
+ uint32_t row_to_batch_thread_number, const
std::optional<std::string>& table_schema,
+ const std::shared_ptr<MemoryPool>& memory_pool,
+ const std::shared_ptr<Executor>& executor,
+ const std::shared_ptr<FileSystem>& specific_file_system,
+ const std::map<std::string, std::string>&
fs_scheme_to_identifier_map,
+ const std::map<std::string, std::string>& options,
+ PrefetchCacheMode prefetch_cache_mode, const CacheConfig&
cache_config);
+ ~ReadContext();
+
+ const std::string& GetPath() const {
+ return path_;
+ }
+
+ const std::string& GetBranch() const {
+ return branch_;
+ }
+
+ const std::map<std::string, std::string>&
GetFileSystemSchemeToIdentifierMap() const {
+ return fs_scheme_to_identifier_map_;
+ }
+
+ const std::map<std::string, std::string>& GetOptions() const {
+ return options_;
+ }
+
+ const std::vector<std::string>& GetReadSchema() const {
+ return read_schema_;
+ }
+
+ const std::vector<int32_t>& GetReadFieldIds() const {
+ return read_field_ids_;
+ }
+
+ const std::shared_ptr<Predicate>& GetPredicate() const {
+ return predicate_;
+ }
+
+ bool EnablePredicateFilter() const {
+ return enable_predicate_filter_;
+ }
+ bool EnablePrefetch() const {
+ return enable_prefetch_;
+ }
+ uint32_t GetPrefetchBatchCount() const {
+ return prefetch_batch_count_;
+ }
+ uint32_t GetPrefetchMaxParallelNum() const {
+ return prefetch_max_parallel_num_;
+ }
+ bool EnableMultiThreadRowToBatch() const {
+ return enable_multi_thread_row_to_batch_;
+ }
+ uint32_t GetRowToBatchThreadNumber() const {
+ return row_to_batch_thread_number_;
+ }
+ const std::optional<std::string>& GetSpecificTableSchema() {
+ return table_schema_;
+ }
+ std::shared_ptr<MemoryPool> GetMemoryPool() const {
+ return memory_pool_;
+ }
+ std::shared_ptr<Executor> GetExecutor() const {
+ return executor_;
+ }
+ std::shared_ptr<FileSystem> GetSpecificFileSystem() const {
+ return specific_file_system_;
+ }
+
+ PrefetchCacheMode GetPrefetchCacheMode() const {
+ return prefetch_cache_mode_;
+ }
+
+ const CacheConfig& GetCacheConfig() const {
+ return cache_config_;
+ }
+
+ private:
+ std::string path_;
+ std::string branch_;
+ std::vector<std::string> read_schema_;
+ std::vector<int32_t> read_field_ids_;
+ std::shared_ptr<Predicate> predicate_;
+ bool enable_predicate_filter_;
+ bool enable_prefetch_;
+ uint32_t prefetch_batch_count_;
+ uint32_t prefetch_max_parallel_num_;
+ bool enable_multi_thread_row_to_batch_;
+ uint32_t row_to_batch_thread_number_;
+ std::optional<std::string> table_schema_;
+ std::shared_ptr<MemoryPool> memory_pool_;
+ std::shared_ptr<Executor> executor_;
+ std::shared_ptr<FileSystem> specific_file_system_;
+ std::map<std::string, std::string> fs_scheme_to_identifier_map_;
+ std::map<std::string, std::string> options_;
+ PrefetchCacheMode prefetch_cache_mode_;
+ CacheConfig cache_config_;
+};
+
+/// `ReadContextBuilder` used to build a `ReadContext`, has input validation.
+class PAIMON_EXPORT ReadContextBuilder {
+ public:
+ /// Constructs a `ReadContextBuilder` with required parameters.
+ /// @param path The root path of the table.
+ explicit ReadContextBuilder(const std::string& path);
+
+ ~ReadContextBuilder();
+
+ ReadContextBuilder(ReadContextBuilder&&) noexcept;
+ ReadContextBuilder& operator=(ReadContextBuilder&&) noexcept;
+
+ /// Set the schema fields to read from the table.
+ ///
+ /// If not set, all fields from the table schema will be read. This is
useful for
+ /// projection pushdown to reduce I/O and improve performance by reading
only
+ /// the required columns.
+ ///
+ /// @param read_field_names Vector of field names to read from the table.
+ /// @return Reference to this builder for method chaining.
+ /// @note Currently supports top-level field selection. Future versions
may support
+ /// nested field selection using ArrowSchema for more granular
projection
+ ReadContextBuilder& SetReadSchema(const std::vector<std::string>&
read_field_names);
+ /// Set the schema fields to read from the table.
+ ///
+ /// If not set, all fields from the table schema will be read. This is
useful for
+ /// projection pushdown to reduce I/O and improve performance by reading
only
+ /// the required columns.
+ ///
+ /// @param read_field_ids Vector of field ids to read from the table.
+ /// @return Reference to this builder for method chaining.
+ /// @note Currently supports top-level field selection. Future versions
may support
+ /// nested field selection using ArrowSchema for more granular
projection.
+ /// @note SetReadFieldIds() and SetReadSchema() are mutually exclusive.
+ /// Calling both will ignore the read schema set by SetReadSchema().
+ ReadContextBuilder& SetReadFieldIds(const std::vector<int32_t>&
read_field_ids);
+
+ /// Set a configuration options map to set some option entries which are
not defined in the
+ /// table schema or whose values you want to overwrite.
+ /// @note The options map will clear the options added by `AddOption()`
before.
+ /// @param options The configuration options map.
+ /// @return Reference to this builder for method chaining.
+ ReadContextBuilder& SetOptions(const std::map<std::string, std::string>&
options);
+
+ /// Add a single configuration option which is not defined in the table
schema or whose value
+ /// you want to overwrite.
+ ///
+ /// If you want to add multiple options, call `AddOption()` multiple times
or use `SetOptions()`
+ /// instead.
+ /// @param key The option key.
+ /// @param value The option value.
+ /// @return Reference to this builder for method chaining.
+ ReadContextBuilder& AddOption(const std::string& key, const std::string&
value);
+
+ /// Set a predicate for filtering data during reading.
+ ///
+ /// The predicate is used for both partition pruning and data filtering.
+ /// It can significantly improve performance by reducing the amount of data
+ /// that needs to be read and processed.
+ ///
+ /// @param predicate Shared pointer to the predicate for data filtering.
+ /// @return Reference to this builder for method chaining.
+ ReadContextBuilder& SetPredicate(const std::shared_ptr<Predicate>&
predicate);
+
+ /// Whether to perform precise filtering according to predicates for data
read from format
+ /// reader.
+ /// @param enabled Whether to enable precise filtering (default: false)
+ /// @return Reference to this builder for method chaining.
+ ReadContextBuilder& EnablePredicateFilter(bool enabled);
+
+ /// Enable or disable prefetching of data batches from individual files.
+ ///
+ /// When enabled, the reader will prefetch multiple batches in parallel to
+ /// improve throughput by overlapping I/O with computation. This is
particularly
+ /// beneficial for high-latency storage systems.
+ ///
+ /// @param enabled Whether to enable prefetching (default: false)
+ /// @return Reference to this builder for method chaining.
+ ReadContextBuilder& EnablePrefetch(bool enabled);
+
+ /// Set prefetch cache mode for read operations.
+ ///
+ /// A prefetch cache is used to prebuffer data ranges before they are
needed,
+ /// which can improve read performance by reducing redundant I/O
operations.
+ /// @param mode (default: PrefetchCacheMode::ALWAYS)
+ /// @return Reference to this builder for method chaining.
+ ReadContextBuilder& SetPrefetchCacheMode(PrefetchCacheMode mode);
+
+ /// Set the cache configuration for prefetch read operations.
+ ///
+ /// @param config The cache configuration to use.
+ /// @return Reference to this builder for method chaining.
+ ReadContextBuilder& WithCacheConfig(const CacheConfig& config);
+
+ /// Set the total number of batches to prefetch across all files.
+ ///
+ /// This controls the memory usage and parallelism of the prefetching
mechanism.
+ /// Higher values can improve throughput but consume more memory.
+ ///
+ /// @param batch_count Total number of batches to prefetch (default: 600)
+ /// @return Reference to this builder for method chaining.
+ ReadContextBuilder& SetPrefetchBatchCount(uint32_t batch_count);
+
+ /// Set the maximum number of parallel prefetch operations.
+ ///
+ /// This limits the number of concurrent I/O operations to prevent
overwhelming
+ /// the storage system or consuming excessive system resources.
+ ///
+ /// @param parallel_num Maximum parallel prefetch operations (default: 3)
+ /// @return Reference to this builder for method chaining.
+ ReadContextBuilder& SetPrefetchMaxParallelNum(uint32_t parallel_num);
+
+ /// Enable or disable multi-threaded row-to-batch conversion in
merge-on-read scenarios.
+ ///
+ /// When enabled, multiple threads are used to convert row data to batch
format
+ /// during merge operations, which can improve performance for
CPU-intensive
+ /// merge operations.
+ ///
+ /// @param enabled Whether to enable multi-threaded conversion (default:
false)
+ /// @return Reference to this builder for method chaining.
+ ReadContextBuilder& EnableMultiThreadRowToBatch(bool enabled);
+
+ /// Set the number of threads for row-to-batch conversion in merge-on-read
scenarios.
+ ///
+ /// This controls the parallelism of row-to-batch conversion during merge
operations.
+ /// Higher values can improve performance but may affect result ordering.
+ ///
+ /// @param thread_number Number of conversion threads (default: 1)
+ /// @return Reference to this builder for method chaining.
+ /// @note If thread_number > 1, Arrow batches from the reader may not be
in primary key order.
+ ReadContextBuilder& SetRowToBatchThreadNumber(uint32_t thread_number);
+
+ /// Set custom memory pool for memory management.
+ /// @param memory_pool The memory pool to use.
+ /// @return Reference to this builder for method chaining.
+ /// @note If not set, the default system memory pool will be used.
+ ReadContextBuilder& WithMemoryPool(const std::shared_ptr<MemoryPool>&
memory_pool);
+
+ /// Set custom executor for task execution.
+ /// @param executor The executor to use.
+ /// @return Reference to this builder for method chaining.
+ /// @note If not set, the default system executor will be used.
+ ReadContextBuilder& WithExecutor(const std::shared_ptr<Executor>&
executor);
+
+ /// Set the table schema as a string to avoid schema loading I/O
operations.
+ ///
+ /// This optimization allows the reader to use a pre-loaded schema instead
of
+ /// reading it from the table metadata, which can improve performance
especially
+ /// in scenarios with many small read operations.
+ ///
+ /// @param table_schema String representation of the table schema.
+ /// @return Reference to this builder for method chaining.
+ /// @note The user must ensure that the schema string is valid and matches
the table.
+ /// @note If not set, the schema will be loaded from the table path.
+ ReadContextBuilder& SetTableSchema(const std::string& table_schema);
+
+ /// Set the specific branch to read from in a versioned table.
+ ///
+ /// Paimon supports branching for data versioning and time travel queries.
+ /// This method allows reading from a specific branch instead of the main
branch.
+ ///
+ /// @param branch Name of the branch to read from.
+ /// @return Reference to this builder for method chaining.
+ /// @note Default branch is "main" if not specified.
+ ReadContextBuilder& WithBranch(const std::string& branch);
+
+ /// Sets a mapping from URI schemes (e.g., "file", "oss") to registered
file system
+ /// identifiers. This allows selecting different pre-registered file
system implementations
+ /// based on the URI scheme at runtime.
+ ///
+ /// @param fs_scheme_to_identifier_map Map from URI scheme (like "oss") to
the corresponding
+ /// file system identifier.
+ /// @return Reference to this builder for method chaining.
+ /// @note
+ /// - This method is intended for environments where multiple file
systems are pre-registered.
+ /// - The specified identifiers must correspond to file systems that
have been registered at
+ /// compile time or initialization.
+ /// - Cannot be used together with `WithFileSystem()`.
+ /// - If not set, use default file system (configured in
`Options::FILE_SYSTEM`).
+ /// Example:
+ /// builder.WithFileSystemSchemeToIdentifierMap({{"oss", "jindo"},
{"file", "local"}});
+ ///
+ ReadContextBuilder& WithFileSystemSchemeToIdentifierMap(
+ const std::map<std::string, std::string>& fs_scheme_to_identifier_map);
+
+ /// Sets a custom file system instance to be used for all file operations
in this read context.
+ /// This bypasses the global file system registry and uses the provided
implementation directly.
+ ///
+ /// @param file_system The file system to use.
+ /// @return Reference to this builder for method chaining.
+ /// @note If not set, use default file system (configured in
`Options::FILE_SYSTEM`)
+ ReadContextBuilder& WithFileSystem(const std::shared_ptr<FileSystem>&
file_system);
+
+ /// Build and return a `ReadContext` instance with input validation.
+ /// @return Result containing the constructed `ReadContext` or an error
status.
+ Result<std::unique_ptr<ReadContext>> Finish();
+
+ private:
+ class Impl;
+
+ std::unique_ptr<Impl> impl_;
+};
+} // namespace paimon
diff --git a/src/paimon/core/operation/abstract_split_read.cpp
b/src/paimon/core/operation/abstract_split_read.cpp
new file mode 100644
index 0000000..fb57c0e
--- /dev/null
+++ b/src/paimon/core/operation/abstract_split_read.cpp
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/abstract_split_read.h"
+
+#include <cassert>
+#include <cstddef>
+#include <utility>
+
+#include "arrow/type.h"
+#include "paimon/common/reader/delegating_prefetch_reader.h"
+#include "paimon/common/reader/predicate_batch_reader.h"
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/object_utils.h"
+#include "paimon/core/io/complete_row_tracking_fields_reader.h"
+#include "paimon/core/io/data_file_meta.h"
+#include "paimon/core/io/data_file_path_factory.h"
+#include "paimon/core/io/field_mapping_reader.h"
+#include "paimon/core/operation/internal_read_context.h"
+#include "paimon/core/partition/partition_info.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/core/table/source/data_split_impl.h"
+#include "paimon/core/utils/field_mapping.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class BinaryRow;
+class Executor;
+class FileStorePathFactory;
+class MemoryPool;
+class Predicate;
+
+AbstractSplitRead::AbstractSplitRead(const
std::shared_ptr<FileStorePathFactory>& path_factory,
+ const
std::shared_ptr<InternalReadContext>& context,
+ std::unique_ptr<SchemaManager>&&
schema_manager,
+ const std::shared_ptr<MemoryPool>&
memory_pool,
+ const std::shared_ptr<Executor>& executor)
+ : pool_(memory_pool),
+ executor_(executor),
+ path_factory_(path_factory),
+ options_(context->GetCoreOptions()),
+ raw_read_schema_(context->GetReadSchema()),
+ context_(context),
+ schema_manager_(std::move(schema_manager)) {}
+
+Result<std::vector<std::unique_ptr<FileBatchReader>>>
AbstractSplitRead::CreateRawFileReaders(
+ const BinaryRow& partition, const
std::vector<std::shared_ptr<DataFileMeta>>& data_files,
+ const std::shared_ptr<arrow::Schema>& read_schema, const
std::shared_ptr<Predicate>& predicate,
+ DeletionVector::Factory dv_factory, const
std::optional<std::vector<Range>>& row_ranges,
+ const std::shared_ptr<DataFilePathFactory>& data_file_path_factory) const {
+ if (data_files.empty()) {
+ return std::vector<std::unique_ptr<FileBatchReader>>();
+ }
+ PAIMON_ASSIGN_OR_RAISE(
+ std::unique_ptr<FieldMappingBuilder> field_mapping_builder,
+ FieldMappingBuilder::Create(read_schema, context_->GetPartitionKeys(),
predicate));
+
+ std::vector<std::unique_ptr<FileBatchReader>> raw_file_readers;
+ raw_file_readers.reserve(data_files.size());
+ for (const auto& file : data_files) {
+ auto data_file_path = data_file_path_factory->ToPath(file);
+ PAIMON_ASSIGN_OR_RAISE(std::string data_file_identifier,
file->FileFormat());
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<ReaderBuilder> reader_builder,
+ PrepareReaderBuilder(data_file_identifier));
+ PAIMON_ASSIGN_OR_RAISE(
+ std::unique_ptr<FileBatchReader> file_reader,
+ CreateFieldMappingReader(data_file_path, file, partition,
reader_builder.get(),
+ field_mapping_builder.get(), dv_factory,
row_ranges,
+ data_file_path_factory));
+ if (file_reader) {
+ raw_file_readers.push_back(std::move(file_reader));
+ }
+ }
+ return std::move(raw_file_readers);
+}
+
+bool AbstractSplitRead::NeedCompleteRowTrackingFields(
+ bool row_tracking_enabled, const std::shared_ptr<arrow::Schema>&
read_schema) {
+ if (row_tracking_enabled &&
+ (read_schema->GetFieldIndex(SpecialFields::RowId().Name()) != -1 ||
+ read_schema->GetFieldIndex(SpecialFields::SequenceNumber().Name()) !=
-1)) {
+ return true;
+ }
+ return false;
+}
+
+std::unordered_map<std::string, DeletionFile>
AbstractSplitRead::CreateDeletionFileMap(
+ const DataSplitImpl& data_split) {
+ return CreateDeletionFileMap(data_split.DataFiles(),
data_split.DeletionFiles());
+}
+
+std::unordered_map<std::string, DeletionFile>
AbstractSplitRead::CreateDeletionFileMap(
+ const std::vector<std::shared_ptr<DataFileMeta>>& data_files,
+ const std::vector<std::optional<DeletionFile>>& deletion_files) {
+ std::unordered_map<std::string, DeletionFile> deletion_file_map;
+ if (deletion_files.empty()) {
+ return deletion_file_map;
+ }
+ assert(deletion_files.size() == data_files.size());
+ size_t file_count = deletion_files.size();
+ for (size_t i = 0; i < file_count; i++) {
+ if (deletion_files[i] != std::nullopt) {
+ deletion_file_map.emplace(data_files[i]->file_name,
deletion_files[i].value());
+ }
+ }
+ return deletion_file_map;
+}
+
+Result<std::unique_ptr<BatchReader>>
AbstractSplitRead::ApplyPredicateFilterIfNeeded(
+ std::unique_ptr<BatchReader>&& reader, const std::shared_ptr<Predicate>&
predicate) const {
+ if (!context_->EnablePredicateFilter() || predicate == nullptr) {
+ return std::move(reader);
+ }
+ return PredicateBatchReader::Create(std::move(reader), predicate, pool_);
+}
+
+Result<std::unique_ptr<ReaderBuilder>> AbstractSplitRead::PrepareReaderBuilder(
+ const std::string& format_identifier) const {
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileFormat> file_format,
+ FileFormatFactory::Get(format_identifier,
options_.ToMap()));
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<ReaderBuilder> reader_builder,
+
file_format->CreateReaderBuilder(options_.GetReadBatchSize()));
+ reader_builder->WithMemoryPool(pool_);
+ return reader_builder;
+}
+
+Result<std::unique_ptr<FileBatchReader>>
AbstractSplitRead::CreateFileBatchReader(
+ const std::shared_ptr<DataFileMeta>& file_meta, const std::string&
data_file_path,
+ const ReaderBuilder* reader_builder) const {
+ PAIMON_ASSIGN_OR_RAISE(std::string file_format_identifier,
file_meta->FileFormat());
+ if (file_format_identifier == "lance") {
+ // lance do not support stream build with input stream
+ return reader_builder->Build(data_file_path);
+ }
+ if (context_->EnablePrefetch() && file_format_identifier != "blob" &&
+ file_format_identifier != "avro") {
+ PAIMON_ASSIGN_OR_RAISE(
+ std::unique_ptr<PrefetchFileBatchReaderImpl> prefetch_reader,
+ PrefetchFileBatchReaderImpl::Create(
+ data_file_path, reader_builder, options_.GetFileSystem(),
+ context_->GetPrefetchMaxParallelNum(),
options_.GetReadBatchSize(),
+ context_->GetPrefetchBatchCount(),
options_.EnableAdaptivePrefetchStrategy(),
+ executor_,
+ /*initialize_read_ranges=*/false,
context_->GetPrefetchCacheMode(),
+ context_->GetCacheConfig(), pool_));
+ return
std::make_unique<DelegatingPrefetchReader>(std::move(prefetch_reader));
+ } else {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> input_stream,
+ options_.GetFileSystem()->Open(data_file_path));
+ return reader_builder->Build(input_stream);
+ }
+}
+
+Result<std::unique_ptr<FileBatchReader>>
AbstractSplitRead::CreateFieldMappingReader(
+ const std::string& data_file_path, const std::shared_ptr<DataFileMeta>&
file_meta,
+ const BinaryRow& partition, const ReaderBuilder* reader_builder,
+ const FieldMappingBuilder* field_mapping_builder, DeletionVector::Factory
dv_factory,
+ const std::optional<std::vector<Range>>& row_ranges,
+ const std::shared_ptr<DataFilePathFactory>& data_file_path_factory) const {
+ std::shared_ptr<TableSchema> data_schema;
+ if (file_meta->schema_id == context_->GetTableSchema()->Id()) {
+ data_schema = context_->GetTableSchema();
+ } else {
+ // load schema to get data schema
+ PAIMON_ASSIGN_OR_RAISE(data_schema,
schema_manager_->ReadSchema(file_meta->schema_id));
+ }
+ std::unique_ptr<FieldMapping> field_mapping;
+ if (!data_schema->PrimaryKeys().empty()) {
+ // for pk table, add special fields to file schema when field mapping
+ std::vector<DataField> file_fields = {SpecialFields::SequenceNumber(),
+ SpecialFields::ValueKind()};
+ file_fields.insert(file_fields.end(), data_schema->Fields().begin(),
+ data_schema->Fields().end());
+ PAIMON_ASSIGN_OR_RAISE(field_mapping,
+
field_mapping_builder->CreateFieldMapping(file_fields));
+ } else {
+ PAIMON_ASSIGN_OR_RAISE(
+ std::vector<DataField> projected_data_fields,
+ ProjectFieldsForRowTrackingAndDataEvolution(data_schema,
file_meta->write_cols));
+ PAIMON_ASSIGN_OR_RAISE(field_mapping,
+
field_mapping_builder->CreateFieldMapping(projected_data_fields));
+ }
+
+ auto read_schema = DataField::ConvertDataFieldsToArrowSchema(
+ field_mapping->non_partition_info.non_partition_data_schema);
+
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileBatchReader> file_reader,
+ CreateFileBatchReader(file_meta, data_file_path,
reader_builder));
+ if (NeedCompleteRowTrackingFields(options_.RowTrackingEnabled(),
read_schema)) {
+ file_reader = std::make_unique<CompleteRowTrackingFieldsBatchReader>(
+ std::move(file_reader), file_meta->first_row_id,
file_meta->max_sequence_number, pool_);
+ }
+
+ const auto& predicate =
field_mapping->non_partition_info.non_partition_filter;
+ auto all_data_schema =
DataField::ConvertDataFieldsToArrowSchema(data_schema->Fields());
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileBatchReader> final_reader,
+ ApplyIndexAndDvReaderIfNeeded(
+ std::move(file_reader), file_meta,
all_data_schema, read_schema,
+ predicate, dv_factory, row_ranges,
data_file_path_factory));
+ if (!final_reader) {
+ // file is skipped by index or dv
+ return std::unique_ptr<FileBatchReader>();
+ }
+
+ return
std::make_unique<FieldMappingReader>(field_mapping_builder->GetReadFieldCount(),
+ std::move(final_reader),
partition,
+ std::move(field_mapping),
pool_);
+}
+
+Result<std::vector<DataField>>
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+ const std::shared_ptr<TableSchema>& data_schema,
+ const std::optional<std::vector<std::string>>& write_cols) {
+ std::vector<DataField> projected_fields;
+ const std::vector<std::string>& partition_keys =
data_schema->PartitionKeys();
+ if (write_cols == std::nullopt) {
+ projected_fields = data_schema->Fields();
+ } else {
+ if (write_cols.value().empty()) {
+ return Status::Invalid("write cols cannot be empty");
+ }
+ for (const auto& write_col : write_cols.value()) {
+ if (write_col == SpecialFields::RowId().Name() ||
+ write_col == SpecialFields::SequenceNumber().Name()) {
+ continue;
+ }
+ PAIMON_ASSIGN_OR_RAISE(DataField field,
data_schema->GetField(write_col));
+ projected_fields.push_back(field);
+ }
+ for (const auto& partition_key : partition_keys) {
+ if (!ObjectUtils::Contains(write_cols.value(), partition_key)) {
+ PAIMON_ASSIGN_OR_RAISE(DataField partition_field,
+ data_schema->GetField(partition_key));
+ projected_fields.push_back(partition_field);
+ }
+ }
+ }
+
+ projected_fields.push_back(SpecialFields::RowId());
+ projected_fields.push_back(SpecialFields::SequenceNumber());
+ return projected_fields;
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/operation/abstract_split_read.h
b/src/paimon/core/operation/abstract_split_read.h
new file mode 100644
index 0000000..ed5dda9
--- /dev/null
+++ b/src/paimon/core/operation/abstract_split_read.h
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "arrow/type_fwd.h"
+#include "paimon/core/core_options.h"
+#include "paimon/core/deletionvectors/deletion_vector.h"
+#include "paimon/core/io/field_mapping_reader.h"
+#include "paimon/core/operation/internal_read_context.h"
+#include "paimon/core/operation/split_read.h"
+#include "paimon/core/schema/schema_manager.h"
+#include "paimon/core/table/source/data_split_impl.h"
+#include "paimon/core/table/source/deletion_file.h"
+#include "paimon/core/utils/file_store_path_factory.h"
+#include "paimon/format/reader_builder.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/reader/file_batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class Schema;
+} // namespace arrow
+
+namespace paimon {
+class BinaryRow;
+class DataField;
+class DataFilePathFactory;
+class DataSplitImpl;
+class Executor;
+class FieldMappingBuilder;
+class FileStorePathFactory;
+class InternalReadContext;
+class MemoryPool;
+class Predicate;
+struct DataFileMeta;
+class TableSchema;
+
+class AbstractSplitRead : public SplitRead {
+ public:
+ ~AbstractSplitRead() override = default;
+
+ Result<std::vector<std::unique_ptr<FileBatchReader>>> CreateRawFileReaders(
+ const BinaryRow& partition, const
std::vector<std::shared_ptr<DataFileMeta>>& data_files,
+ const std::shared_ptr<arrow::Schema>& read_schema,
+ const std::shared_ptr<Predicate>& predicate, DeletionVector::Factory
dv_factory,
+ const std::optional<std::vector<Range>>& row_ranges,
+ const std::shared_ptr<DataFilePathFactory>& data_file_path_factory)
const;
+
+ protected:
+ AbstractSplitRead(const std::shared_ptr<FileStorePathFactory>&
path_factory,
+ const std::shared_ptr<InternalReadContext>& context,
+ std::unique_ptr<SchemaManager>&& schema_manager,
+ const std::shared_ptr<MemoryPool>& memory_pool,
+ const std::shared_ptr<Executor>& executor);
+
+ static std::unordered_map<std::string, DeletionFile> CreateDeletionFileMap(
+ const DataSplitImpl& data_split);
+
+ static std::unordered_map<std::string, DeletionFile> CreateDeletionFileMap(
+ const std::vector<std::shared_ptr<DataFileMeta>>& data_files,
+ const std::vector<std::optional<DeletionFile>>& deletion_files);
+
+ Result<std::unique_ptr<BatchReader>> ApplyPredicateFilterIfNeeded(
+ std::unique_ptr<BatchReader>&& reader, const
std::shared_ptr<Predicate>& predicate) const;
+
+ protected:
+ // return nullptr if file is skipped by index or dv
+ virtual Result<std::unique_ptr<FileBatchReader>>
ApplyIndexAndDvReaderIfNeeded(
+ std::unique_ptr<FileBatchReader>&& file_reader, const
std::shared_ptr<DataFileMeta>& file,
+ const std::shared_ptr<arrow::Schema>& data_schema,
+ const std::shared_ptr<arrow::Schema>& read_schema,
+ const std::shared_ptr<Predicate>& predicate, DeletionVector::Factory
dv_factory,
+ const std::optional<std::vector<Range>>& row_ranges,
+ const std::shared_ptr<DataFilePathFactory>& data_file_path_factory)
const = 0;
+
+ // 1. project write cols to data schema
+ // 2. add partition fields (if write cols not contain)
+ // 3. add row tracking fields
+ static Result<std::vector<DataField>>
ProjectFieldsForRowTrackingAndDataEvolution(
+ const std::shared_ptr<TableSchema>& data_schema,
+ const std::optional<std::vector<std::string>>& write_cols);
+
+ private:
+ Result<std::unique_ptr<ReaderBuilder>> PrepareReaderBuilder(
+ const std::string& format_identifier) const;
+
+ Result<std::unique_ptr<FileBatchReader>> CreateFileBatchReader(
+ const std::shared_ptr<DataFileMeta>& file_meta, const std::string&
data_file_path,
+ const ReaderBuilder* reader_builder) const;
+
+ // return nullptr if data file is skipped by index or dv
+ Result<std::unique_ptr<FileBatchReader>> CreateFieldMappingReader(
+ const std::string& data_file_path, const
std::shared_ptr<DataFileMeta>& file_meta,
+ const BinaryRow& partition, const ReaderBuilder* reader_builder,
+ const FieldMappingBuilder* field_mapping_builder,
DeletionVector::Factory dv_factory,
+ const std::optional<std::vector<Range>>& row_ranges,
+ const std::shared_ptr<DataFilePathFactory>& data_file_path_factory)
const;
+
+ static bool NeedCompleteRowTrackingFields(bool row_tracking_enabled,
+ const
std::shared_ptr<arrow::Schema>& read_schema);
+
+ protected:
+ std::shared_ptr<MemoryPool> pool_;
+ std::shared_ptr<Executor> executor_;
+ std::shared_ptr<FileStorePathFactory> path_factory_;
+ CoreOptions options_;
+ // user recall schema
+ std::shared_ptr<arrow::Schema> raw_read_schema_;
+ std::shared_ptr<InternalReadContext> context_;
+ std::unique_ptr<SchemaManager> schema_manager_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/operation/abstract_split_read_test.cpp
b/src/paimon/core/operation/abstract_split_read_test.cpp
new file mode 100644
index 0000000..6fdd7f2
--- /dev/null
+++ b/src/paimon/core/operation/abstract_split_read_test.cpp
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/abstract_split_read.h"
+
+#include <cstdint>
+#include <limits>
+
+#include "arrow/type_fwd.h"
+#include "gtest/gtest.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(AbstractSplitReadTest, TestNeedCompleteRowTrackingFields) {
+ std::vector<DataField> data_fields = {DataField(0, arrow::field("name",
arrow::utf8())),
+ DataField(1, arrow::field("sex",
arrow::utf8())),
+ DataField(2, arrow::field("age",
arrow::int32())),
+ SpecialFields::RowId(),
SpecialFields::SequenceNumber()};
+ auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(data_fields);
+ auto fields = arrow_schema->fields();
+
+
ASSERT_TRUE(AbstractSplitRead::NeedCompleteRowTrackingFields(/*row_tracking_enabled=*/true,
+
arrow::schema(fields)));
+ ASSERT_TRUE(AbstractSplitRead::NeedCompleteRowTrackingFields(
+ /*row_tracking_enabled=*/true, arrow::schema({fields[0], fields[3]})));
+ ASSERT_TRUE(AbstractSplitRead::NeedCompleteRowTrackingFields(
+ /*row_tracking_enabled=*/true, arrow::schema({fields[0], fields[4]})));
+ ASSERT_FALSE(AbstractSplitRead::NeedCompleteRowTrackingFields(
+ /*row_tracking_enabled=*/true, arrow::schema({fields[0], fields[1]})));
+
ASSERT_FALSE(AbstractSplitRead::NeedCompleteRowTrackingFields(/*row_tracking_enabled=*/false,
+
arrow::schema(fields)));
+}
+
+TEST(AbstractSplitReadTest, TestProjectFieldsForRowTrackingAndDataEvolution) {
+ {
+ // test no partition
+ std::vector<DataField> fields = {DataField(0, arrow::field("name",
arrow::utf8())),
+ DataField(1, arrow::field("sex",
arrow::utf8())),
+ DataField(2, arrow::field("age",
arrow::int32()))};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0,
DataField::ConvertDataFieldsToArrowSchema(fields),
+ /*partition_keys=*/{},
+ /*primary_keys=*/{}, /*options=*/{}));
+
+ {
+ // test write_cols is std::nullopt
+ ASSERT_OK_AND_ASSIGN(auto result,
+
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+ table_schema,
/*write_cols=*/std::nullopt));
+ std::vector<DataField> expected = fields;
+ expected.push_back(SpecialFields::RowId());
+ expected.push_back(SpecialFields::SequenceNumber());
+ ASSERT_EQ(result, expected);
+ }
+ {
+ // test with write_cols
+ std::vector<std::string> write_cols = {"name", "age"};
+ ASSERT_OK_AND_ASSIGN(auto result,
+
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+ table_schema, write_cols));
+ std::vector<DataField> expected = {fields[0], fields[2],
SpecialFields::RowId(),
+
SpecialFields::SequenceNumber()};
+ ASSERT_EQ(result, expected);
+ }
+ {
+ // test with empty write_cols
+ std::vector<std::string> write_cols = {};
+
ASSERT_NOK_WITH_MSG(AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+ table_schema, write_cols),
+ "write cols cannot be empty");
+ }
+ }
+ {
+ // test with partition
+ std::vector<DataField> fields = {DataField(0, arrow::field("name",
arrow::utf8())),
+ DataField(1, arrow::field("ds",
arrow::utf8())),
+ DataField(2, arrow::field("sex",
arrow::utf8())),
+ DataField(3, arrow::field("age",
arrow::int32()))};
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<TableSchema> table_schema,
+ TableSchema::Create(/*schema_id=*/0,
DataField::ConvertDataFieldsToArrowSchema(fields),
+ /*partition_keys=*/{"ds"},
+ /*primary_keys=*/{}, /*options=*/{}));
+
+ {
+ // test write_cols is std::nullopt
+ ASSERT_OK_AND_ASSIGN(auto result,
+
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+ table_schema,
/*write_cols=*/std::nullopt));
+ std::vector<DataField> expected = fields;
+ expected.push_back(SpecialFields::RowId());
+ expected.push_back(SpecialFields::SequenceNumber());
+ ASSERT_EQ(result, expected);
+ }
+ {
+ // test with write_cols and write_cols not contain partition fields
+ std::vector<std::string> write_cols = {"name", "age"};
+ ASSERT_OK_AND_ASSIGN(auto result,
+
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+ table_schema, write_cols));
+ std::vector<DataField> expected = {fields[0], fields[3], fields[1],
+ SpecialFields::RowId(),
+
SpecialFields::SequenceNumber()};
+ ASSERT_EQ(result, expected);
+ }
+ {
+ // test with write_cols and write_cols contain partition fields
+ std::vector<std::string> write_cols = {"age", "name", "ds"};
+ ASSERT_OK_AND_ASSIGN(auto result,
+
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+ table_schema, write_cols));
+ std::vector<DataField> expected = {fields[3], fields[0], fields[1],
+ SpecialFields::RowId(),
+
SpecialFields::SequenceNumber()};
+ ASSERT_EQ(result, expected);
+ }
+ {
+ // test with write_cols and write_cols contain row tracking fields
+ std::vector<std::string> write_cols = {
+ "age",
+ "name",
+ SpecialFields::RowId().Name(),
+ SpecialFields::SequenceNumber().Name(),
+ };
+ ASSERT_OK_AND_ASSIGN(auto result,
+
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+ table_schema, write_cols));
+ std::vector<DataField> expected = {fields[3], fields[0], fields[1],
+ SpecialFields::RowId(),
+
SpecialFields::SequenceNumber()};
+ ASSERT_EQ(result, expected);
+ }
+ {
+ // test with empty write_cols
+ std::vector<std::string> write_cols = {};
+
ASSERT_NOK_WITH_MSG(AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+ table_schema, write_cols),
+ "write cols cannot be empty");
+ }
+ }
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/operation/internal_read_context.cpp
b/src/paimon/core/operation/internal_read_context.cpp
new file mode 100644
index 0000000..fb0f36b
--- /dev/null
+++ b/src/paimon/core/operation/internal_read_context.cpp
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/internal_read_context.h"
+
+#include <utility>
+
+#include "paimon/common/predicate/predicate_validator.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/core/schema/arrow_schema_validator.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class Schema;
+} // namespace arrow
+
+namespace paimon {
+Result<std::unique_ptr<InternalReadContext>> InternalReadContext::Create(
+ const std::shared_ptr<ReadContext>& context, const
std::shared_ptr<TableSchema>& table_schema,
+ const std::map<std::string, std::string>& options) {
+ PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options,
+ CoreOptions::FromMap(options,
context->GetSpecificFileSystem(),
+
context->GetFileSystemSchemeToIdentifierMap()));
+ // prepare read schema
+ std::vector<DataField> read_data_fields;
+ if (!context->GetReadFieldIds().empty()) {
+ read_data_fields.reserve(context->GetReadFieldIds().size());
+ for (const auto& field_id : context->GetReadFieldIds()) {
+ // if enable row tracking or data evolution, check special fields
+ if (core_options.RowTrackingEnabled() && field_id ==
SpecialFields::RowId().Id()) {
+ read_data_fields.push_back(SpecialFields::RowId());
+ continue;
+ }
+ if ((core_options.RowTrackingEnabled() ||
+ core_options.KeyValueSequenceNumberEnabled()) &&
+ field_id == SpecialFields::SequenceNumber().Id()) {
+ read_data_fields.push_back(SpecialFields::SequenceNumber());
+ continue;
+ }
+ if (field_id == SpecialFields::ValueKind().Id()) {
+ read_data_fields.push_back(SpecialFields::ValueKind());
+ continue;
+ }
+ if (core_options.DataEvolutionEnabled() &&
+ field_id == SpecialFields::IndexScore().Id()) {
+ read_data_fields.push_back(SpecialFields::IndexScore());
+ continue;
+ }
+ PAIMON_ASSIGN_OR_RAISE(DataField field,
table_schema->GetField(field_id));
+ read_data_fields.push_back(field);
+ }
+ } else if (!context->GetReadSchema().empty()) {
+ read_data_fields.reserve(context->GetReadSchema().size());
+ for (const auto& name : context->GetReadSchema()) {
+ // if enable row tracking or data evolution, check special fields
+ if (core_options.RowTrackingEnabled() && name ==
SpecialFields::RowId().Name()) {
+ read_data_fields.push_back(SpecialFields::RowId());
+ continue;
+ }
+ if ((core_options.RowTrackingEnabled() ||
+ core_options.KeyValueSequenceNumberEnabled()) &&
+ name == SpecialFields::SequenceNumber().Name()) {
+ read_data_fields.push_back(SpecialFields::SequenceNumber());
+ continue;
+ }
+ if (name == SpecialFields::ValueKind().Name()) {
+ read_data_fields.push_back(SpecialFields::ValueKind());
+ continue;
+ }
+ if (core_options.DataEvolutionEnabled() && name ==
SpecialFields::IndexScore().Name()) {
+ read_data_fields.push_back(SpecialFields::IndexScore());
+ continue;
+ }
+ PAIMON_ASSIGN_OR_RAISE(DataField field,
table_schema->GetField(name));
+ read_data_fields.push_back(field);
+ }
+ } else {
+ // if field names not set, read all fields
+ read_data_fields = table_schema->Fields();
+ }
+ auto read_schema =
DataField::ConvertDataFieldsToArrowSchema(read_data_fields);
+ // validate read schema to avoid redundant fields
+
PAIMON_RETURN_NOT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*read_schema));
+ // validate predicate
+ if (context->GetPredicate()) {
+ PAIMON_RETURN_NOT_OK(PredicateValidator::ValidatePredicateWithSchema(
+ *read_schema, context->GetPredicate(),
/*validate_field_idx=*/true));
+ PAIMON_RETURN_NOT_OK(
+
PredicateValidator::ValidatePredicateWithLiterals(context->GetPredicate()));
+ }
+
+ return std::unique_ptr<InternalReadContext>(
+ new InternalReadContext(context, table_schema, read_schema,
core_options));
+}
+
+InternalReadContext::InternalReadContext(const std::shared_ptr<ReadContext>&
read_context,
+ const std::shared_ptr<TableSchema>&
table_schema,
+ const std::shared_ptr<arrow::Schema>&
read_schema,
+ const CoreOptions& options)
+ : read_context_(read_context),
+ table_schema_(table_schema),
+ read_schema_(read_schema),
+ options_(options) {}
+
+} // namespace paimon
diff --git a/src/paimon/core/operation/internal_read_context.h
b/src/paimon/core/operation/internal_read_context.h
new file mode 100644
index 0000000..c199026
--- /dev/null
+++ b/src/paimon/core/operation/internal_read_context.h
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/core/core_options.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/read_context.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class Schema;
+} // namespace arrow
+
+namespace paimon {
+class Executor;
+class MemoryPool;
+class Predicate;
+
+// internal read context, contains ReadContext, CoreOptions and TableSchema
+class InternalReadContext {
+ public:
+ static Result<std::unique_ptr<InternalReadContext>> Create(
+ const std::shared_ptr<ReadContext>& read_context,
+ const std::shared_ptr<TableSchema>& table_schema,
+ const std::map<std::string, std::string>& options);
+
+ const CoreOptions& GetCoreOptions() const {
+ return options_;
+ }
+ std::shared_ptr<arrow::Schema> GetReadSchema() const {
+ return read_schema_;
+ }
+ const std::shared_ptr<TableSchema>& GetTableSchema() const {
+ return table_schema_;
+ }
+ const std::string& GetPath() const {
+ return read_context_->GetPath();
+ }
+ const std::vector<std::string>& GetPartitionKeys() const {
+ return table_schema_->PartitionKeys();
+ }
+ const std::vector<std::string>& GetPrimaryKeys() const {
+ return table_schema_->PrimaryKeys();
+ }
+ const std::shared_ptr<Predicate>& GetPredicate() const {
+ return read_context_->GetPredicate();
+ }
+ bool EnablePredicateFilter() const {
+ return read_context_->EnablePredicateFilter();
+ }
+ bool EnablePrefetch() const {
+ return read_context_->EnablePrefetch();
+ }
+ uint32_t GetPrefetchBatchCount() const {
+ return read_context_->GetPrefetchBatchCount();
+ }
+ uint32_t GetPrefetchMaxParallelNum() const {
+ return read_context_->GetPrefetchMaxParallelNum();
+ }
+ bool EnableMultiThreadRowToBatch() const {
+ return read_context_->EnableMultiThreadRowToBatch();
+ }
+ uint32_t GetRowToBatchThreadNumber() const {
+ return read_context_->GetRowToBatchThreadNumber();
+ }
+ std::shared_ptr<MemoryPool> GetMemoryPool() const {
+ return read_context_->GetMemoryPool();
+ }
+ std::shared_ptr<Executor> GetExecutor() const {
+ return read_context_->GetExecutor();
+ }
+
+ PrefetchCacheMode GetPrefetchCacheMode() const {
+ return read_context_->GetPrefetchCacheMode();
+ }
+
+ const CacheConfig& GetCacheConfig() const {
+ return read_context_->GetCacheConfig();
+ }
+
+ private:
+ InternalReadContext(const std::shared_ptr<ReadContext>& read_context,
+ const std::shared_ptr<TableSchema>& table_schema,
+ const std::shared_ptr<arrow::Schema>& read_schema,
+ const CoreOptions& options);
+
+ std::shared_ptr<ReadContext> read_context_;
+ std::shared_ptr<TableSchema> table_schema_;
+ std::shared_ptr<arrow::Schema> read_schema_;
+ CoreOptions options_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/core/operation/internal_read_context_test.cpp
b/src/paimon/core/operation/internal_read_context_test.cpp
new file mode 100644
index 0000000..4d1467b
--- /dev/null
+++ b/src/paimon/core/operation/internal_read_context_test.cpp
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/internal_read_context.h"
+
+#include <utility>
+
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/core/schema/schema_manager.h"
+#include "paimon/defs.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(InternalReadContext, TestReadWithUnspecifiedSchema) {
+ // no read schema is specified, read all fields
+ std::string path = paimon::test::GetDataDir() +
"/orc/append_09.db/append_09";
+ ReadContextBuilder context_builder(path);
+ ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+ SchemaManager schema_manager(std::make_shared<LocalFileSystem>(),
read_context->GetPath());
+ ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+ ASSERT_OK_AND_ASSIGN(auto internal_context,
+ InternalReadContext::Create(std::move(read_context),
table_schema,
+ table_schema->Options()));
+ std::vector<DataField> read_fields = {DataField(0, arrow::field("f0",
arrow::utf8())),
+ DataField(1, arrow::field("f1",
arrow::int32())),
+ DataField(2, arrow::field("f2",
arrow::int32())),
+ DataField(3, arrow::field("f3",
arrow::float64()))};
+ auto expected_schema =
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+ ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+}
+
+TEST(InternalReadContext, TestReadWithSpecifiedSchema) {
+ std::string path = paimon::test::GetDataDir() +
"/orc/append_09.db/append_09";
+ ReadContextBuilder context_builder(path);
+ context_builder.SetReadSchema({"f3", "f0"});
+ ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+ SchemaManager schema_manager(std::make_shared<LocalFileSystem>(),
read_context->GetPath());
+ ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+ ASSERT_OK_AND_ASSIGN(auto internal_context,
+ InternalReadContext::Create(std::move(read_context),
table_schema,
+ table_schema->Options()));
+ std::vector<DataField> read_fields = {DataField(3, arrow::field("f3",
arrow::float64())),
+ DataField(0, arrow::field("f0",
arrow::utf8()))};
+ auto expected_schema =
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+ ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+}
+
+TEST(InternalReadContext, TestReadWithSpecifiedFieldId) {
+ std::string path = paimon::test::GetDataDir() +
"/orc/append_09.db/append_09";
+ ReadContextBuilder context_builder(path);
+ context_builder.SetReadFieldIds({3, 0});
+ ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+ SchemaManager schema_manager(std::make_shared<LocalFileSystem>(),
read_context->GetPath());
+ ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+ ASSERT_OK_AND_ASSIGN(auto internal_context,
+ InternalReadContext::Create(std::move(read_context),
table_schema,
+ table_schema->Options()));
+ std::vector<DataField> read_fields = {DataField(3, arrow::field("f3",
arrow::float64())),
+ DataField(0, arrow::field("f0",
arrow::utf8()))};
+ auto expected_schema =
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+ ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+}
+
+TEST(InternalReadContext, TestReadWithSpecifiedFieldIdAndSchema) {
+ std::string path = paimon::test::GetDataDir() +
"/orc/append_09.db/append_09";
+ ReadContextBuilder context_builder(path);
+ // read schema is specified, read fields in schema
+ // will use field ids instead of field names.
+ context_builder.SetReadSchema({"f0"});
+ context_builder.SetReadFieldIds({3, 0});
+ ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+ SchemaManager schema_manager(std::make_shared<LocalFileSystem>(),
read_context->GetPath());
+ ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+ ASSERT_OK_AND_ASSIGN(auto internal_context,
+ InternalReadContext::Create(std::move(read_context),
table_schema,
+ table_schema->Options()));
+ std::vector<DataField> read_fields = {DataField(3, arrow::field("f3",
arrow::float64())),
+ DataField(0, arrow::field("f0",
arrow::utf8()))};
+ auto expected_schema =
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+ ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+}
+
+TEST(InternalReadContext, TestReadWithRowTrackingAndScoreFields) {
+ {
+ // test simple
+ std::string path = paimon::test::GetDataDir() +
"/orc/append_09.db/append_09";
+ ReadContextBuilder context_builder(path);
+ context_builder.SetReadSchema({"f3", "f0", "_ROW_ID",
"_SEQUENCE_NUMBER", "_INDEX_SCORE"});
+ ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+ SchemaManager schema_manager(std::make_shared<LocalFileSystem>(),
read_context->GetPath());
+ ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+ auto new_options = table_schema->Options();
+ new_options[Options::ROW_TRACKING_ENABLED] = "true";
+ new_options[Options::DATA_EVOLUTION_ENABLED] = "true";
+ ASSERT_OK_AND_ASSIGN(
+ auto internal_context,
+ InternalReadContext::Create(std::move(read_context), table_schema,
new_options));
+ std::vector<DataField> read_fields = {
+ DataField(3, arrow::field("f3", arrow::float64())),
+ DataField(0, arrow::field("f0", arrow::utf8())),
SpecialFields::RowId(),
+ SpecialFields::SequenceNumber(), SpecialFields::IndexScore()};
+ auto expected_schema =
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+
ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+ }
+ {
+ // test invalid case: disable row tracking while read row tracking
fields
+ std::string path = paimon::test::GetDataDir() +
"/orc/append_09.db/append_09";
+ ReadContextBuilder context_builder(path);
+ context_builder.SetReadSchema({"f3", "f0", "_ROW_ID",
"_SEQUENCE_NUMBER"});
+ ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+ SchemaManager schema_manager(std::make_shared<LocalFileSystem>(),
read_context->GetPath());
+ ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+
ASSERT_NOK_WITH_MSG(InternalReadContext::Create(std::move(read_context),
table_schema,
+
table_schema->Options()),
+ "Get field _ROW_ID failed: not exist in table
schema");
+ }
+ {
+ // test invalid case: disable data evolution while read score fields
+ std::string path = paimon::test::GetDataDir() +
"/orc/append_09.db/append_09";
+ ReadContextBuilder context_builder(path);
+ context_builder.SetReadSchema({"f3", "f0", "_INDEX_SCORE"});
+ ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+ SchemaManager schema_manager(std::make_shared<LocalFileSystem>(),
read_context->GetPath());
+ ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+
ASSERT_NOK_WITH_MSG(InternalReadContext::Create(std::move(read_context),
table_schema,
+
table_schema->Options()),
+ "Get field _INDEX_SCORE failed: not exist in table
schema");
+ }
+}
+
+TEST(InternalReadContext, TestReadWithValueKindField) {
+ std::string path = paimon::test::GetDataDir() +
"/orc/append_09.db/append_09";
+ ReadContextBuilder context_builder(path);
+ context_builder.SetReadSchema({"f3", "_VALUE_KIND", "f0"});
+ ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+ SchemaManager schema_manager(std::make_shared<LocalFileSystem>(),
read_context->GetPath());
+ ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+ ASSERT_OK_AND_ASSIGN(auto internal_context,
+ InternalReadContext::Create(std::move(read_context),
table_schema,
+ table_schema->Options()));
+ std::vector<DataField> read_fields = {DataField(3, arrow::field("f3",
arrow::float64())),
+ SpecialFields::ValueKind(),
+ DataField(0, arrow::field("f0",
arrow::utf8()))};
+ auto expected_schema =
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+ ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+}
+
+TEST(InternalReadContext, TestReadWithFieldIdsAndSpecialFields) {
+ {
+ // test simple
+ std::string path = paimon::test::GetDataDir() +
"/orc/append_09.db/append_09";
+ ReadContextBuilder context_builder(path);
+ // here we use field ids instead of field names, and specify special
ids for row id,
+ // sequence number and index score.
+ context_builder.SetReadFieldIds({3, 0, SpecialFieldIds::ROW_ID,
+ SpecialFieldIds::SEQUENCE_NUMBER,
+ SpecialFieldIds::INDEX_SCORE});
+ ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+ SchemaManager schema_manager(std::make_shared<LocalFileSystem>(),
read_context->GetPath());
+ ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+ auto new_options = table_schema->Options();
+ new_options[Options::ROW_TRACKING_ENABLED] = "true";
+ new_options[Options::DATA_EVOLUTION_ENABLED] = "true";
+ ASSERT_OK_AND_ASSIGN(
+ auto internal_context,
+ InternalReadContext::Create(std::move(read_context), table_schema,
new_options));
+ std::vector<DataField> read_fields = {
+ DataField(3, arrow::field("f3", arrow::float64())),
+ DataField(0, arrow::field("f0", arrow::utf8())),
SpecialFields::RowId(),
+ SpecialFields::SequenceNumber(), SpecialFields::IndexScore()};
+ auto expected_schema =
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+
ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+ }
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/operation/read_context.cpp
b/src/paimon/core/operation/read_context.cpp
new file mode 100644
index 0000000..81fe236
--- /dev/null
+++ b/src/paimon/core/operation/read_context.cpp
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/read_context.h"
+
+#include <utility>
+
+#include "paimon/common/utils/path_util.h"
+#include "paimon/core/utils/branch_manager.h"
+#include "paimon/executor.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class Predicate;
+
+ReadContext::ReadContext(
+ const std::string& path, const std::string& branch, const
std::vector<std::string>& read_schema,
+ const std::vector<int32_t>& read_field_ids, const
std::shared_ptr<Predicate>& predicate,
+ bool enable_predicate_filter, bool enable_prefetch, uint32_t
prefetch_batch_count,
+ uint32_t prefetch_max_parallel_num, bool enable_multi_thread_row_to_batch,
+ uint32_t row_to_batch_thread_number, const std::optional<std::string>&
table_schema,
+ const std::shared_ptr<MemoryPool>& memory_pool, const
std::shared_ptr<Executor>& executor,
+ const std::shared_ptr<FileSystem>& specific_file_system,
+ const std::map<std::string, std::string>& fs_scheme_to_identifier_map,
+ const std::map<std::string, std::string>& options, PrefetchCacheMode
prefetch_cache_mode,
+ const CacheConfig& cache_config)
+ : path_(path),
+ branch_(branch),
+ read_schema_(read_schema),
+ read_field_ids_(read_field_ids),
+ predicate_(predicate),
+ enable_predicate_filter_(enable_predicate_filter),
+ enable_prefetch_(enable_prefetch),
+ prefetch_batch_count_(prefetch_batch_count),
+ prefetch_max_parallel_num_(prefetch_max_parallel_num),
+ enable_multi_thread_row_to_batch_(enable_multi_thread_row_to_batch),
+ row_to_batch_thread_number_(row_to_batch_thread_number),
+ table_schema_(table_schema),
+ memory_pool_(memory_pool),
+ executor_(executor),
+ specific_file_system_(specific_file_system),
+ fs_scheme_to_identifier_map_(fs_scheme_to_identifier_map),
+ options_(options),
+ prefetch_cache_mode_(prefetch_cache_mode),
+ cache_config_(cache_config) {}
+
+ReadContext::~ReadContext() = default;
+
+class ReadContextBuilder::Impl {
+ public:
+ friend class ReadContextBuilder;
+ void Reset() {
+ branch_ = BranchManager::DEFAULT_MAIN_BRANCH;
+ read_field_names_.clear();
+ read_field_ids_.clear();
+ fs_scheme_to_identifier_map_.clear();
+ options_.clear();
+ predicate_.reset();
+ enable_predicate_filter_ = false;
+ enable_prefetch_ = false;
+ prefetch_cache_mode_ = PrefetchCacheMode::ALWAYS;
+ prefetch_batch_count_ = 600;
+ prefetch_max_parallel_num_ = 3;
+ enable_multi_thread_row_to_batch_ = false;
+ row_to_batch_thread_number_ = 1;
+ table_schema_ = std::nullopt;
+ memory_pool_ = GetDefaultPool();
+ executor_.reset();
+ specific_file_system_.reset();
+ cache_config_ = CacheConfig();
+ }
+
+ private:
+ std::string path_;
+ std::string branch_ = BranchManager::DEFAULT_MAIN_BRANCH;
+ std::vector<std::string> read_field_names_;
+ std::vector<int32_t> read_field_ids_;
+ std::map<std::string, std::string> fs_scheme_to_identifier_map_;
+ std::map<std::string, std::string> options_;
+ std::shared_ptr<Predicate> predicate_;
+ bool enable_predicate_filter_ = false;
+ bool enable_prefetch_ = false;
+ uint32_t prefetch_batch_count_ = 600;
+ uint32_t prefetch_max_parallel_num_ = 3;
+ bool enable_multi_thread_row_to_batch_ = false;
+ uint32_t row_to_batch_thread_number_ = 1;
+ std::optional<std::string> table_schema_;
+ std::shared_ptr<MemoryPool> memory_pool_ = GetDefaultPool();
+ std::shared_ptr<Executor> executor_;
+ std::shared_ptr<FileSystem> specific_file_system_;
+ PrefetchCacheMode prefetch_cache_mode_ = PrefetchCacheMode::ALWAYS;
+ CacheConfig cache_config_;
+};
+
+ReadContextBuilder::ReadContextBuilder(const std::string& path)
+ : impl_(std::make_unique<ReadContextBuilder::Impl>()) {
+ impl_->path_ = path;
+}
+
+ReadContextBuilder::~ReadContextBuilder() = default;
+
+ReadContextBuilder::ReadContextBuilder(ReadContextBuilder&&) noexcept =
default;
+ReadContextBuilder& ReadContextBuilder::operator=(ReadContextBuilder&&)
noexcept = default;
+
+ReadContextBuilder& ReadContextBuilder::AddOption(const std::string& key,
+ const std::string& value) {
+ impl_->options_[key] = value;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetOptions(const std::map<std::string,
std::string>& opts) {
+ impl_->options_ = opts;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetReadSchema(
+ const std::vector<std::string>& read_field_names) {
+ impl_->read_field_names_ = read_field_names;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetReadFieldIds(
+ const std::vector<int32_t>& read_field_ids) {
+ impl_->read_field_ids_ = read_field_ids;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetPredicate(const
std::shared_ptr<Predicate>& predicate) {
+ impl_->predicate_ = predicate;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::EnablePredicateFilter(bool enabled) {
+ impl_->enable_predicate_filter_ = enabled;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::EnablePrefetch(bool enabled) {
+ impl_->enable_prefetch_ = enabled;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetPrefetchBatchCount(uint32_t
batch_count) {
+ impl_->prefetch_batch_count_ = batch_count;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetPrefetchMaxParallelNum(uint32_t
max_parallel_num) {
+ impl_->prefetch_max_parallel_num_ = max_parallel_num;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::EnableMultiThreadRowToBatch(bool
enabled) {
+ impl_->enable_multi_thread_row_to_batch_ = enabled;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetRowToBatchThreadNumber(uint32_t
thread_number) {
+ impl_->row_to_batch_thread_number_ = thread_number;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithMemoryPool(
+ const std::shared_ptr<MemoryPool>& memory_pool) {
+ impl_->memory_pool_ = memory_pool;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithExecutor(const
std::shared_ptr<Executor>& executor) {
+ impl_->executor_ = executor;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetTableSchema(const std::string&
table_schema) {
+ impl_->table_schema_ = table_schema;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithBranch(const std::string& branch) {
+ impl_->branch_ = branch;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithFileSystemSchemeToIdentifierMap(
+ const std::map<std::string, std::string>& fs_scheme_to_identifier_map) {
+ impl_->fs_scheme_to_identifier_map_ = fs_scheme_to_identifier_map;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithFileSystem(
+ const std::shared_ptr<FileSystem>& file_system) {
+ impl_->specific_file_system_ = file_system;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetPrefetchCacheMode(PrefetchCacheMode
mode) {
+ impl_->prefetch_cache_mode_ = mode;
+ return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithCacheConfig(const CacheConfig&
cache_config) {
+ impl_->cache_config_ = cache_config;
+ return *this;
+}
+
+Result<std::unique_ptr<ReadContext>> ReadContextBuilder::Finish() {
+ PAIMON_ASSIGN_OR_RAISE(impl_->path_,
PathUtil::NormalizePath(impl_->path_));
+ if (impl_->path_.empty()) {
+ return Status::Invalid("cannot read with empty table path");
+ }
+ if (impl_->enable_prefetch_ && impl_->prefetch_batch_count_ <= 0) {
+ return Status::Invalid("prefetch batch count should be greater than
0");
+ }
+ if (impl_->enable_prefetch_ &&
+ impl_->prefetch_batch_count_ < impl_->prefetch_max_parallel_num_) {
+ return Status::Invalid(
+ "prefetch batch count should be greater than or equal to prefetch
max parallel num");
+ }
+ if (!impl_->executor_) {
+ // If the user do not set executor, create default executor by
prefetch batch count
+ uint32_t thread_count = impl_->enable_prefetch_ ?
impl_->prefetch_max_parallel_num_ : 1;
+ impl_->executor_ = CreateDefaultExecutor(thread_count);
+ }
+
+ if (impl_->enable_multi_thread_row_to_batch_ &&
impl_->row_to_batch_thread_number_ <= 0) {
+ return Status::Invalid("row to batch thread number should be greater
than 0");
+ }
+ auto ctx = std::make_unique<ReadContext>(
+ impl_->path_, impl_->branch_, impl_->read_field_names_,
impl_->read_field_ids_,
+ impl_->predicate_, impl_->enable_predicate_filter_,
impl_->enable_prefetch_,
+ impl_->prefetch_batch_count_, impl_->prefetch_max_parallel_num_,
+ impl_->enable_multi_thread_row_to_batch_,
impl_->row_to_batch_thread_number_,
+ impl_->table_schema_, impl_->memory_pool_, impl_->executor_,
impl_->specific_file_system_,
+ impl_->fs_scheme_to_identifier_map_, impl_->options_,
impl_->prefetch_cache_mode_,
+ impl_->cache_config_);
+ impl_->Reset();
+ return ctx;
+}
+
+} // namespace paimon
diff --git a/src/paimon/core/operation/read_context_test.cpp
b/src/paimon/core/operation/read_context_test.cpp
new file mode 100644
index 0000000..dc79b69
--- /dev/null
+++ b/src/paimon/core/operation/read_context_test.cpp
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/read_context.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/defs.h"
+#include "paimon/executor.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_system.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(ReadContextTest, TestDefaultValue) {
+ ReadContextBuilder builder("table_root_path");
+ ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish());
+ ASSERT_EQ(ctx->GetPath(), "table_root_path");
+ ASSERT_TRUE(ctx->GetMemoryPool());
+ ASSERT_TRUE(ctx->GetExecutor());
+ ASSERT_TRUE(ctx->GetReadSchema().empty());
+ ASSERT_TRUE(ctx->GetReadFieldIds().empty());
+ ASSERT_TRUE(ctx->GetOptions().empty());
+ ASSERT_FALSE(ctx->GetPredicate());
+ ASSERT_FALSE(ctx->EnablePredicateFilter());
+ ASSERT_FALSE(ctx->EnablePrefetch());
+ ASSERT_EQ(PrefetchCacheMode::ALWAYS, ctx->GetPrefetchCacheMode());
+ ASSERT_EQ(600, ctx->GetPrefetchBatchCount());
+ ASSERT_EQ(3, ctx->GetPrefetchMaxParallelNum());
+ ASSERT_FALSE(ctx->EnableMultiThreadRowToBatch());
+ ASSERT_EQ(1, ctx->GetRowToBatchThreadNumber());
+ ASSERT_EQ("main", ctx->GetBranch());
+ ASSERT_TRUE(ctx->GetFileSystemSchemeToIdentifierMap().empty());
+ ASSERT_FALSE(ctx->GetSpecificFileSystem());
+}
+
+TEST(ReadContextTest, TestSetContent) {
+ ReadContextBuilder builder("table_root_path");
+ std::shared_ptr<MemoryPool> memory_pool = GetDefaultPool();
+ std::shared_ptr<Executor> executor = CreateDefaultExecutor();
+ CacheConfig cache_config(/*buffer_size_limit=*/1024,
/*range_size_limit=*/512,
+ /*hole_size_limit=*/128,
/*pre_buffer_limit=*/2048);
+
+ builder.AddOption("key", "value");
+ builder.SetReadSchema({"f1", "f2"});
+ builder.SetReadFieldIds({0, 1});
+ auto predicate =
+ PredicateBuilder::IsNull(/*field_index=*/0, /*field_name=*/"f1",
FieldType::INT);
+ builder.SetPredicate(predicate);
+ builder.EnablePredicateFilter(true);
+ builder.EnablePrefetch(true);
+ builder.SetPrefetchCacheMode(PrefetchCacheMode::NEVER);
+ builder.SetPrefetchBatchCount(1200);
+ builder.SetPrefetchMaxParallelNum(6);
+ builder.EnableMultiThreadRowToBatch(true);
+ builder.SetRowToBatchThreadNumber(9);
+ builder.WithMemoryPool(memory_pool);
+ builder.WithExecutor(executor);
+ builder.SetTableSchema("table-schema-json");
+ builder.WithBranch("rt");
+ builder.WithCacheConfig(cache_config);
+ builder.WithFileSystemSchemeToIdentifierMap({{"file", "local"}});
+ auto fs = std::make_shared<MockFileSystem>();
+ builder.WithFileSystem(fs);
+ ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish());
+
+ // test result
+ ASSERT_EQ(ctx->GetPath(), "table_root_path");
+ ASSERT_TRUE(ctx->GetMemoryPool());
+ ASSERT_TRUE(ctx->GetExecutor());
+ ASSERT_EQ(ctx->GetReadSchema(), std::vector<std::string>({"f1", "f2"}));
+ ASSERT_EQ(ctx->GetReadFieldIds(), std::vector<int32_t>({0, 1}));
+ ASSERT_EQ(*predicate, *(ctx->GetPredicate()));
+ ASSERT_TRUE(ctx->EnablePredicateFilter());
+ ASSERT_TRUE(ctx->EnablePrefetch());
+ ASSERT_EQ(PrefetchCacheMode::NEVER, ctx->GetPrefetchCacheMode());
+ ASSERT_EQ(1200, ctx->GetPrefetchBatchCount());
+ ASSERT_EQ(6, ctx->GetPrefetchMaxParallelNum());
+ ASSERT_TRUE(ctx->EnableMultiThreadRowToBatch());
+ ASSERT_EQ(9, ctx->GetRowToBatchThreadNumber());
+ ASSERT_EQ(memory_pool, ctx->GetMemoryPool());
+ ASSERT_EQ(executor, ctx->GetExecutor());
+ ASSERT_TRUE(ctx->GetSpecificTableSchema().has_value());
+ ASSERT_EQ("table-schema-json", ctx->GetSpecificTableSchema().value());
+ ASSERT_EQ("rt", ctx->GetBranch());
+ ASSERT_EQ(1024U, ctx->GetCacheConfig().GetBufferSizeLimit());
+ ASSERT_EQ(512U, ctx->GetCacheConfig().GetRangeSizeLimit());
+ ASSERT_EQ(128U, ctx->GetCacheConfig().GetHoleSizeLimit());
+ ASSERT_EQ(2048U, ctx->GetCacheConfig().GetPreBufferLimit());
+ std::map<std::string, std::string> expected_fs_map = {{"file", "local"}};
+ ASSERT_EQ(expected_fs_map, ctx->GetFileSystemSchemeToIdentifierMap());
+ std::map<std::string, std::string> expected_options = {{"key", "value"}};
+ ASSERT_EQ(expected_options, ctx->GetOptions());
+ ASSERT_EQ(ctx->GetSpecificFileSystem(), fs);
+}
+
+TEST(ReadContextTest, TestSetOptionsOverridesAddedOptions) {
+ ReadContextBuilder builder("table_root_path");
+ builder.AddOption("old", "value");
+ builder.SetOptions({{"key1", "value1"}, {"key2", "value2"}});
+
+ ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish());
+
+ std::map<std::string, std::string> expected_options = {{"key1", "value1"},
{"key2", "value2"}};
+ ASSERT_EQ(expected_options, ctx->GetOptions());
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/core/operation/split_read.h
b/src/paimon/core/operation/split_read.h
new file mode 100644
index 0000000..ffde90a
--- /dev/null
+++ b/src/paimon/core/operation/split_read.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "paimon/executor.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/read_context.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/table/source/data_split.h"
+
+namespace paimon {
+/// Given a DataSplit or a list of DataSplit, generate a reader for batch
reading.
+class SplitRead {
+ public:
+ virtual ~SplitRead() = default;
+
+ virtual Result<std::unique_ptr<BatchReader>> CreateReader(
+ const std::shared_ptr<Split>& split) = 0;
+
+ virtual Result<bool> Match(const std::shared_ptr<Split>& split,
+ bool force_keep_delete) const = 0;
+};
+
+} // namespace paimon