This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new dc57135  feat(core): introduce read context (#63)
dc57135 is described below

commit dc57135795c9773e6e0a7b934022a05a7c1c9386
Author: Zhang Jiawei <[email protected]>
AuthorDate: Wed Jun 10 11:13:37 2026 +0800

    feat(core): introduce read context (#63)
---
 include/paimon/read_context.h                      | 353 +++++++++++++++++++++
 src/paimon/core/operation/abstract_split_read.cpp  | 263 +++++++++++++++
 src/paimon/core/operation/abstract_split_read.h    | 136 ++++++++
 .../core/operation/abstract_split_read_test.cpp    | 161 ++++++++++
 .../core/operation/internal_read_context.cpp       | 121 +++++++
 src/paimon/core/operation/internal_read_context.h  | 115 +++++++
 .../core/operation/internal_read_context_test.cpp  | 196 ++++++++++++
 src/paimon/core/operation/read_context.cpp         | 256 +++++++++++++++
 src/paimon/core/operation/read_context_test.cpp    | 126 ++++++++
 src/paimon/core/operation/split_read.h             |  44 +++
 10 files changed, 1771 insertions(+)

diff --git a/include/paimon/read_context.h b/include/paimon/read_context.h
new file mode 100644
index 0000000..e4d79b2
--- /dev/null
+++ b/include/paimon/read_context.h
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "paimon/predicate/predicate.h"
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+#include "paimon/utils/read_ahead_cache.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class Executor;
+class MemoryPool;
+class Predicate;
+class FileSystem;
+
+/// `ReadContext` is some configuration for read operations.
+///
+/// Please do not use this class directly, use `ReadContextBuilder` to build a 
`ReadContext` which
+/// has input validation.
+/// @see ReadContextBuilder
+class PAIMON_EXPORT ReadContext {
+ public:
+    ReadContext(const std::string& path, const std::string& branch,
+                const std::vector<std::string>& read_schema,
+                const std::vector<int32_t>& read_field_ids,
+                const std::shared_ptr<Predicate>& predicate, bool 
enable_predicate_filter,
+                bool enable_prefetch, uint32_t prefetch_batch_count,
+                uint32_t prefetch_max_parallel_num, bool 
enable_multi_thread_row_to_batch,
+                uint32_t row_to_batch_thread_number, const 
std::optional<std::string>& table_schema,
+                const std::shared_ptr<MemoryPool>& memory_pool,
+                const std::shared_ptr<Executor>& executor,
+                const std::shared_ptr<FileSystem>& specific_file_system,
+                const std::map<std::string, std::string>& 
fs_scheme_to_identifier_map,
+                const std::map<std::string, std::string>& options,
+                PrefetchCacheMode prefetch_cache_mode, const CacheConfig& 
cache_config);
+    ~ReadContext();
+
+    const std::string& GetPath() const {
+        return path_;
+    }
+
+    const std::string& GetBranch() const {
+        return branch_;
+    }
+
+    const std::map<std::string, std::string>& 
GetFileSystemSchemeToIdentifierMap() const {
+        return fs_scheme_to_identifier_map_;
+    }
+
+    const std::map<std::string, std::string>& GetOptions() const {
+        return options_;
+    }
+
+    const std::vector<std::string>& GetReadSchema() const {
+        return read_schema_;
+    }
+
+    const std::vector<int32_t>& GetReadFieldIds() const {
+        return read_field_ids_;
+    }
+
+    const std::shared_ptr<Predicate>& GetPredicate() const {
+        return predicate_;
+    }
+
+    bool EnablePredicateFilter() const {
+        return enable_predicate_filter_;
+    }
+    bool EnablePrefetch() const {
+        return enable_prefetch_;
+    }
+    uint32_t GetPrefetchBatchCount() const {
+        return prefetch_batch_count_;
+    }
+    uint32_t GetPrefetchMaxParallelNum() const {
+        return prefetch_max_parallel_num_;
+    }
+    bool EnableMultiThreadRowToBatch() const {
+        return enable_multi_thread_row_to_batch_;
+    }
+    uint32_t GetRowToBatchThreadNumber() const {
+        return row_to_batch_thread_number_;
+    }
+    const std::optional<std::string>& GetSpecificTableSchema() {
+        return table_schema_;
+    }
+    std::shared_ptr<MemoryPool> GetMemoryPool() const {
+        return memory_pool_;
+    }
+    std::shared_ptr<Executor> GetExecutor() const {
+        return executor_;
+    }
+    std::shared_ptr<FileSystem> GetSpecificFileSystem() const {
+        return specific_file_system_;
+    }
+
+    PrefetchCacheMode GetPrefetchCacheMode() const {
+        return prefetch_cache_mode_;
+    }
+
+    const CacheConfig& GetCacheConfig() const {
+        return cache_config_;
+    }
+
+ private:
+    std::string path_;
+    std::string branch_;
+    std::vector<std::string> read_schema_;
+    std::vector<int32_t> read_field_ids_;
+    std::shared_ptr<Predicate> predicate_;
+    bool enable_predicate_filter_;
+    bool enable_prefetch_;
+    uint32_t prefetch_batch_count_;
+    uint32_t prefetch_max_parallel_num_;
+    bool enable_multi_thread_row_to_batch_;
+    uint32_t row_to_batch_thread_number_;
+    std::optional<std::string> table_schema_;
+    std::shared_ptr<MemoryPool> memory_pool_;
+    std::shared_ptr<Executor> executor_;
+    std::shared_ptr<FileSystem> specific_file_system_;
+    std::map<std::string, std::string> fs_scheme_to_identifier_map_;
+    std::map<std::string, std::string> options_;
+    PrefetchCacheMode prefetch_cache_mode_;
+    CacheConfig cache_config_;
+};
+
+/// `ReadContextBuilder` used to build a `ReadContext`, has input validation.
+class PAIMON_EXPORT ReadContextBuilder {
+ public:
+    /// Constructs a `ReadContextBuilder` with required parameters.
+    /// @param path The root path of the table.
+    explicit ReadContextBuilder(const std::string& path);
+
+    ~ReadContextBuilder();
+
+    ReadContextBuilder(ReadContextBuilder&&) noexcept;
+    ReadContextBuilder& operator=(ReadContextBuilder&&) noexcept;
+
+    /// Set the schema fields to read from the table.
+    ///
+    /// If not set, all fields from the table schema will be read. This is 
useful for
+    /// projection pushdown to reduce I/O and improve performance by reading 
only
+    /// the required columns.
+    ///
+    /// @param read_field_names Vector of field names to read from the table.
+    /// @return Reference to this builder for method chaining.
+    /// @note Currently supports top-level field selection. Future versions 
may support
+    ///       nested field selection using ArrowSchema for more granular 
projection
+    ReadContextBuilder& SetReadSchema(const std::vector<std::string>& 
read_field_names);
+    /// Set the schema fields to read from the table.
+    ///
+    /// If not set, all fields from the table schema will be read. This is 
useful for
+    /// projection pushdown to reduce I/O and improve performance by reading 
only
+    /// the required columns.
+    ///
+    /// @param read_field_ids Vector of field ids to read from the table.
+    /// @return Reference to this builder for method chaining.
+    /// @note Currently supports top-level field selection. Future versions 
may support
+    ///       nested field selection using ArrowSchema for more granular 
projection.
+    /// @note SetReadFieldIds() and SetReadSchema() are mutually exclusive.
+    ///       Calling both will ignore the read schema set by SetReadSchema().
+    ReadContextBuilder& SetReadFieldIds(const std::vector<int32_t>& 
read_field_ids);
+
+    /// Set a configuration options map to set some option entries which are 
not defined in the
+    /// table schema or whose values you want to overwrite.
+    /// @note The options map will clear the options added by `AddOption()` 
before.
+    /// @param options The configuration options map.
+    /// @return Reference to this builder for method chaining.
+    ReadContextBuilder& SetOptions(const std::map<std::string, std::string>& 
options);
+
+    /// Add a single configuration option which is not defined in the table 
schema or whose value
+    /// you want to overwrite.
+    ///
+    /// If you want to add multiple options, call `AddOption()` multiple times 
or use `SetOptions()`
+    /// instead.
+    /// @param key The option key.
+    /// @param value The option value.
+    /// @return Reference to this builder for method chaining.
+    ReadContextBuilder& AddOption(const std::string& key, const std::string& 
value);
+
+    /// Set a predicate for filtering data during reading.
+    ///
+    /// The predicate is used for both partition pruning and data filtering.
+    /// It can significantly improve performance by reducing the amount of data
+    /// that needs to be read and processed.
+    ///
+    /// @param predicate Shared pointer to the predicate for data filtering.
+    /// @return Reference to this builder for method chaining.
+    ReadContextBuilder& SetPredicate(const std::shared_ptr<Predicate>& 
predicate);
+
+    /// Whether to perform precise filtering according to predicates for data 
read from format
+    /// reader.
+    /// @param enabled Whether to enable precise filtering (default: false)
+    /// @return Reference to this builder for method chaining.
+    ReadContextBuilder& EnablePredicateFilter(bool enabled);
+
+    /// Enable or disable prefetching of data batches from individual files.
+    ///
+    /// When enabled, the reader will prefetch multiple batches in parallel to
+    /// improve throughput by overlapping I/O with computation. This is 
particularly
+    /// beneficial for high-latency storage systems.
+    ///
+    /// @param enabled Whether to enable prefetching (default: false)
+    /// @return Reference to this builder for method chaining.
+    ReadContextBuilder& EnablePrefetch(bool enabled);
+
+    /// Set prefetch cache mode for read operations.
+    ///
+    /// A prefetch cache is used to prebuffer data ranges before they are 
needed,
+    /// which can improve read performance by reducing redundant I/O 
operations.
+    /// @param mode (default: PrefetchCacheMode::ALWAYS)
+    /// @return Reference to this builder for method chaining.
+    ReadContextBuilder& SetPrefetchCacheMode(PrefetchCacheMode mode);
+
+    /// Set the cache configuration for prefetch read operations.
+    ///
+    /// @param config The cache configuration to use.
+    /// @return Reference to this builder for method chaining.
+    ReadContextBuilder& WithCacheConfig(const CacheConfig& config);
+
+    /// Set the total number of batches to prefetch across all files.
+    ///
+    /// This controls the memory usage and parallelism of the prefetching 
mechanism.
+    /// Higher values can improve throughput but consume more memory.
+    ///
+    /// @param batch_count Total number of batches to prefetch (default: 600)
+    /// @return Reference to this builder for method chaining.
+    ReadContextBuilder& SetPrefetchBatchCount(uint32_t batch_count);
+
+    /// Set the maximum number of parallel prefetch operations.
+    ///
+    /// This limits the number of concurrent I/O operations to prevent 
overwhelming
+    /// the storage system or consuming excessive system resources.
+    ///
+    /// @param parallel_num Maximum parallel prefetch operations (default: 3)
+    /// @return Reference to this builder for method chaining.
+    ReadContextBuilder& SetPrefetchMaxParallelNum(uint32_t parallel_num);
+
+    /// Enable or disable multi-threaded row-to-batch conversion in 
merge-on-read scenarios.
+    ///
+    /// When enabled, multiple threads are used to convert row data to batch 
format
+    /// during merge operations, which can improve performance for 
CPU-intensive
+    /// merge operations.
+    ///
+    /// @param enabled Whether to enable multi-threaded conversion (default: 
false)
+    /// @return Reference to this builder for method chaining.
+    ReadContextBuilder& EnableMultiThreadRowToBatch(bool enabled);
+
+    /// Set the number of threads for row-to-batch conversion in merge-on-read 
scenarios.
+    ///
+    /// This controls the parallelism of row-to-batch conversion during merge 
operations.
+    /// Higher values can improve performance but may affect result ordering.
+    ///
+    /// @param thread_number Number of conversion threads (default: 1)
+    /// @return Reference to this builder for method chaining.
+    /// @note If thread_number > 1, Arrow batches from the reader may not be 
in primary key order.
+    ReadContextBuilder& SetRowToBatchThreadNumber(uint32_t thread_number);
+
+    /// Set custom memory pool for memory management.
+    /// @param memory_pool The memory pool to use.
+    /// @return Reference to this builder for method chaining.
+    /// @note If not set, the default system memory pool will be used.
+    ReadContextBuilder& WithMemoryPool(const std::shared_ptr<MemoryPool>& 
memory_pool);
+
+    /// Set custom executor for task execution.
+    /// @param executor The executor to use.
+    /// @return Reference to this builder for method chaining.
+    /// @note If not set, the default system executor will be used.
+    ReadContextBuilder& WithExecutor(const std::shared_ptr<Executor>& 
executor);
+
+    /// Set the table schema as a string to avoid schema loading I/O 
operations.
+    ///
+    /// This optimization allows the reader to use a pre-loaded schema instead 
of
+    /// reading it from the table metadata, which can improve performance 
especially
+    /// in scenarios with many small read operations.
+    ///
+    /// @param table_schema String representation of the table schema.
+    /// @return Reference to this builder for method chaining.
+    /// @note The user must ensure that the schema string is valid and matches 
the table.
+    /// @note If not set, the schema will be loaded from the table path.
+    ReadContextBuilder& SetTableSchema(const std::string& table_schema);
+
+    /// Set the specific branch to read from in a versioned table.
+    ///
+    /// Paimon supports branching for data versioning and time travel queries.
+    /// This method allows reading from a specific branch instead of the main 
branch.
+    ///
+    /// @param branch Name of the branch to read from.
+    /// @return Reference to this builder for method chaining.
+    /// @note Default branch is "main" if not specified.
+    ReadContextBuilder& WithBranch(const std::string& branch);
+
+    /// Sets a mapping from URI schemes (e.g., "file", "oss") to registered 
file system
+    /// identifiers. This allows selecting different pre-registered file 
system implementations
+    /// based on the URI scheme at runtime.
+    ///
+    /// @param fs_scheme_to_identifier_map Map from URI scheme (like "oss") to 
the corresponding
+    /// file system identifier.
+    /// @return Reference to this builder for method chaining.
+    /// @note
+    ///   - This method is intended for environments where multiple file 
systems are pre-registered.
+    ///   - The specified identifiers must correspond to file systems that 
have been registered at
+    ///   compile time or initialization.
+    ///   - Cannot be used together with `WithFileSystem()`.
+    ///   - If not set, use default file system (configured in 
`Options::FILE_SYSTEM`).
+    /// Example:
+    ///   builder.WithFileSystemSchemeToIdentifierMap({{"oss", "jindo"}, 
{"file", "local"}});
+    ///
+    ReadContextBuilder& WithFileSystemSchemeToIdentifierMap(
+        const std::map<std::string, std::string>& fs_scheme_to_identifier_map);
+
+    /// Sets a custom file system instance to be used for all file operations 
in this read context.
+    /// This bypasses the global file system registry and uses the provided 
implementation directly.
+    ///
+    /// @param file_system The file system to use.
+    /// @return Reference to this builder for method chaining.
+    /// @note If not set, use default file system (configured in 
`Options::FILE_SYSTEM`)
+    ReadContextBuilder& WithFileSystem(const std::shared_ptr<FileSystem>& 
file_system);
+
+    /// Build and return a `ReadContext` instance with input validation.
+    /// @return Result containing the constructed `ReadContext` or an error 
status.
+    Result<std::unique_ptr<ReadContext>> Finish();
+
+ private:
+    class Impl;
+
+    std::unique_ptr<Impl> impl_;
+};
+}  // namespace paimon
diff --git a/src/paimon/core/operation/abstract_split_read.cpp 
b/src/paimon/core/operation/abstract_split_read.cpp
new file mode 100644
index 0000000..fb57c0e
--- /dev/null
+++ b/src/paimon/core/operation/abstract_split_read.cpp
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/abstract_split_read.h"
+
+#include <cassert>
+#include <cstddef>
+#include <utility>
+
+#include "arrow/type.h"
+#include "paimon/common/reader/delegating_prefetch_reader.h"
+#include "paimon/common/reader/predicate_batch_reader.h"
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/object_utils.h"
+#include "paimon/core/io/complete_row_tracking_fields_reader.h"
+#include "paimon/core/io/data_file_meta.h"
+#include "paimon/core/io/data_file_path_factory.h"
+#include "paimon/core/io/field_mapping_reader.h"
+#include "paimon/core/operation/internal_read_context.h"
+#include "paimon/core/partition/partition_info.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/core/table/source/data_split_impl.h"
+#include "paimon/core/utils/field_mapping.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class BinaryRow;
+class Executor;
+class FileStorePathFactory;
+class MemoryPool;
+class Predicate;
+
+AbstractSplitRead::AbstractSplitRead(const 
std::shared_ptr<FileStorePathFactory>& path_factory,
+                                     const 
std::shared_ptr<InternalReadContext>& context,
+                                     std::unique_ptr<SchemaManager>&& 
schema_manager,
+                                     const std::shared_ptr<MemoryPool>& 
memory_pool,
+                                     const std::shared_ptr<Executor>& executor)
+    : pool_(memory_pool),
+      executor_(executor),
+      path_factory_(path_factory),
+      options_(context->GetCoreOptions()),
+      raw_read_schema_(context->GetReadSchema()),
+      context_(context),
+      schema_manager_(std::move(schema_manager)) {}
+
+Result<std::vector<std::unique_ptr<FileBatchReader>>> 
AbstractSplitRead::CreateRawFileReaders(
+    const BinaryRow& partition, const 
std::vector<std::shared_ptr<DataFileMeta>>& data_files,
+    const std::shared_ptr<arrow::Schema>& read_schema, const 
std::shared_ptr<Predicate>& predicate,
+    DeletionVector::Factory dv_factory, const 
std::optional<std::vector<Range>>& row_ranges,
+    const std::shared_ptr<DataFilePathFactory>& data_file_path_factory) const {
+    if (data_files.empty()) {
+        return std::vector<std::unique_ptr<FileBatchReader>>();
+    }
+    PAIMON_ASSIGN_OR_RAISE(
+        std::unique_ptr<FieldMappingBuilder> field_mapping_builder,
+        FieldMappingBuilder::Create(read_schema, context_->GetPartitionKeys(), 
predicate));
+
+    std::vector<std::unique_ptr<FileBatchReader>> raw_file_readers;
+    raw_file_readers.reserve(data_files.size());
+    for (const auto& file : data_files) {
+        auto data_file_path = data_file_path_factory->ToPath(file);
+        PAIMON_ASSIGN_OR_RAISE(std::string data_file_identifier, 
file->FileFormat());
+        PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<ReaderBuilder> reader_builder,
+                               PrepareReaderBuilder(data_file_identifier));
+        PAIMON_ASSIGN_OR_RAISE(
+            std::unique_ptr<FileBatchReader> file_reader,
+            CreateFieldMappingReader(data_file_path, file, partition, 
reader_builder.get(),
+                                     field_mapping_builder.get(), dv_factory, 
row_ranges,
+                                     data_file_path_factory));
+        if (file_reader) {
+            raw_file_readers.push_back(std::move(file_reader));
+        }
+    }
+    return std::move(raw_file_readers);
+}
+
+bool AbstractSplitRead::NeedCompleteRowTrackingFields(
+    bool row_tracking_enabled, const std::shared_ptr<arrow::Schema>& 
read_schema) {
+    if (row_tracking_enabled &&
+        (read_schema->GetFieldIndex(SpecialFields::RowId().Name()) != -1 ||
+         read_schema->GetFieldIndex(SpecialFields::SequenceNumber().Name()) != 
-1)) {
+        return true;
+    }
+    return false;
+}
+
+std::unordered_map<std::string, DeletionFile> 
AbstractSplitRead::CreateDeletionFileMap(
+    const DataSplitImpl& data_split) {
+    return CreateDeletionFileMap(data_split.DataFiles(), 
data_split.DeletionFiles());
+}
+
+std::unordered_map<std::string, DeletionFile> 
AbstractSplitRead::CreateDeletionFileMap(
+    const std::vector<std::shared_ptr<DataFileMeta>>& data_files,
+    const std::vector<std::optional<DeletionFile>>& deletion_files) {
+    std::unordered_map<std::string, DeletionFile> deletion_file_map;
+    if (deletion_files.empty()) {
+        return deletion_file_map;
+    }
+    assert(deletion_files.size() == data_files.size());
+    size_t file_count = deletion_files.size();
+    for (size_t i = 0; i < file_count; i++) {
+        if (deletion_files[i] != std::nullopt) {
+            deletion_file_map.emplace(data_files[i]->file_name, 
deletion_files[i].value());
+        }
+    }
+    return deletion_file_map;
+}
+
+Result<std::unique_ptr<BatchReader>> 
AbstractSplitRead::ApplyPredicateFilterIfNeeded(
+    std::unique_ptr<BatchReader>&& reader, const std::shared_ptr<Predicate>& 
predicate) const {
+    if (!context_->EnablePredicateFilter() || predicate == nullptr) {
+        return std::move(reader);
+    }
+    return PredicateBatchReader::Create(std::move(reader), predicate, pool_);
+}
+
+Result<std::unique_ptr<ReaderBuilder>> AbstractSplitRead::PrepareReaderBuilder(
+    const std::string& format_identifier) const {
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileFormat> file_format,
+                           FileFormatFactory::Get(format_identifier, 
options_.ToMap()));
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<ReaderBuilder> reader_builder,
+                           
file_format->CreateReaderBuilder(options_.GetReadBatchSize()));
+    reader_builder->WithMemoryPool(pool_);
+    return reader_builder;
+}
+
+Result<std::unique_ptr<FileBatchReader>> 
AbstractSplitRead::CreateFileBatchReader(
+    const std::shared_ptr<DataFileMeta>& file_meta, const std::string& 
data_file_path,
+    const ReaderBuilder* reader_builder) const {
+    PAIMON_ASSIGN_OR_RAISE(std::string file_format_identifier, 
file_meta->FileFormat());
+    if (file_format_identifier == "lance") {
+        // lance do not support stream build with input stream
+        return reader_builder->Build(data_file_path);
+    }
+    if (context_->EnablePrefetch() && file_format_identifier != "blob" &&
+        file_format_identifier != "avro") {
+        PAIMON_ASSIGN_OR_RAISE(
+            std::unique_ptr<PrefetchFileBatchReaderImpl> prefetch_reader,
+            PrefetchFileBatchReaderImpl::Create(
+                data_file_path, reader_builder, options_.GetFileSystem(),
+                context_->GetPrefetchMaxParallelNum(), 
options_.GetReadBatchSize(),
+                context_->GetPrefetchBatchCount(), 
options_.EnableAdaptivePrefetchStrategy(),
+                executor_,
+                /*initialize_read_ranges=*/false, 
context_->GetPrefetchCacheMode(),
+                context_->GetCacheConfig(), pool_));
+        return 
std::make_unique<DelegatingPrefetchReader>(std::move(prefetch_reader));
+    } else {
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> input_stream,
+                               options_.GetFileSystem()->Open(data_file_path));
+        return reader_builder->Build(input_stream);
+    }
+}
+
+Result<std::unique_ptr<FileBatchReader>> 
AbstractSplitRead::CreateFieldMappingReader(
+    const std::string& data_file_path, const std::shared_ptr<DataFileMeta>& 
file_meta,
+    const BinaryRow& partition, const ReaderBuilder* reader_builder,
+    const FieldMappingBuilder* field_mapping_builder, DeletionVector::Factory 
dv_factory,
+    const std::optional<std::vector<Range>>& row_ranges,
+    const std::shared_ptr<DataFilePathFactory>& data_file_path_factory) const {
+    std::shared_ptr<TableSchema> data_schema;
+    if (file_meta->schema_id == context_->GetTableSchema()->Id()) {
+        data_schema = context_->GetTableSchema();
+    } else {
+        // load schema to get data schema
+        PAIMON_ASSIGN_OR_RAISE(data_schema, 
schema_manager_->ReadSchema(file_meta->schema_id));
+    }
+    std::unique_ptr<FieldMapping> field_mapping;
+    if (!data_schema->PrimaryKeys().empty()) {
+        // for pk table, add special fields to file schema when field mapping
+        std::vector<DataField> file_fields = {SpecialFields::SequenceNumber(),
+                                              SpecialFields::ValueKind()};
+        file_fields.insert(file_fields.end(), data_schema->Fields().begin(),
+                           data_schema->Fields().end());
+        PAIMON_ASSIGN_OR_RAISE(field_mapping,
+                               
field_mapping_builder->CreateFieldMapping(file_fields));
+    } else {
+        PAIMON_ASSIGN_OR_RAISE(
+            std::vector<DataField> projected_data_fields,
+            ProjectFieldsForRowTrackingAndDataEvolution(data_schema, 
file_meta->write_cols));
+        PAIMON_ASSIGN_OR_RAISE(field_mapping,
+                               
field_mapping_builder->CreateFieldMapping(projected_data_fields));
+    }
+
+    auto read_schema = DataField::ConvertDataFieldsToArrowSchema(
+        field_mapping->non_partition_info.non_partition_data_schema);
+
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileBatchReader> file_reader,
+                           CreateFileBatchReader(file_meta, data_file_path, 
reader_builder));
+    if (NeedCompleteRowTrackingFields(options_.RowTrackingEnabled(), 
read_schema)) {
+        file_reader = std::make_unique<CompleteRowTrackingFieldsBatchReader>(
+            std::move(file_reader), file_meta->first_row_id, 
file_meta->max_sequence_number, pool_);
+    }
+
+    const auto& predicate = 
field_mapping->non_partition_info.non_partition_filter;
+    auto all_data_schema = 
DataField::ConvertDataFieldsToArrowSchema(data_schema->Fields());
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileBatchReader> final_reader,
+                           ApplyIndexAndDvReaderIfNeeded(
+                               std::move(file_reader), file_meta, 
all_data_schema, read_schema,
+                               predicate, dv_factory, row_ranges, 
data_file_path_factory));
+    if (!final_reader) {
+        // file is skipped by index or dv
+        return std::unique_ptr<FileBatchReader>();
+    }
+
+    return 
std::make_unique<FieldMappingReader>(field_mapping_builder->GetReadFieldCount(),
+                                                std::move(final_reader), 
partition,
+                                                std::move(field_mapping), 
pool_);
+}
+
+Result<std::vector<DataField>> 
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+    const std::shared_ptr<TableSchema>& data_schema,
+    const std::optional<std::vector<std::string>>& write_cols) {
+    std::vector<DataField> projected_fields;
+    const std::vector<std::string>& partition_keys = 
data_schema->PartitionKeys();
+    if (write_cols == std::nullopt) {
+        projected_fields = data_schema->Fields();
+    } else {
+        if (write_cols.value().empty()) {
+            return Status::Invalid("write cols cannot be empty");
+        }
+        for (const auto& write_col : write_cols.value()) {
+            if (write_col == SpecialFields::RowId().Name() ||
+                write_col == SpecialFields::SequenceNumber().Name()) {
+                continue;
+            }
+            PAIMON_ASSIGN_OR_RAISE(DataField field, 
data_schema->GetField(write_col));
+            projected_fields.push_back(field);
+        }
+        for (const auto& partition_key : partition_keys) {
+            if (!ObjectUtils::Contains(write_cols.value(), partition_key)) {
+                PAIMON_ASSIGN_OR_RAISE(DataField partition_field,
+                                       data_schema->GetField(partition_key));
+                projected_fields.push_back(partition_field);
+            }
+        }
+    }
+
+    projected_fields.push_back(SpecialFields::RowId());
+    projected_fields.push_back(SpecialFields::SequenceNumber());
+    return projected_fields;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/operation/abstract_split_read.h 
b/src/paimon/core/operation/abstract_split_read.h
new file mode 100644
index 0000000..ed5dda9
--- /dev/null
+++ b/src/paimon/core/operation/abstract_split_read.h
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "arrow/type_fwd.h"
+#include "paimon/core/core_options.h"
+#include "paimon/core/deletionvectors/deletion_vector.h"
+#include "paimon/core/io/field_mapping_reader.h"
+#include "paimon/core/operation/internal_read_context.h"
+#include "paimon/core/operation/split_read.h"
+#include "paimon/core/schema/schema_manager.h"
+#include "paimon/core/table/source/data_split_impl.h"
+#include "paimon/core/table/source/deletion_file.h"
+#include "paimon/core/utils/file_store_path_factory.h"
+#include "paimon/format/reader_builder.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/reader/file_batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+class BinaryRow;
+class DataField;
+class DataFilePathFactory;
+class DataSplitImpl;
+class Executor;
+class FieldMappingBuilder;
+class FileStorePathFactory;
+class InternalReadContext;
+class MemoryPool;
+class Predicate;
+struct DataFileMeta;
+class TableSchema;
+
+class AbstractSplitRead : public SplitRead {
+ public:
+    ~AbstractSplitRead() override = default;
+
+    Result<std::vector<std::unique_ptr<FileBatchReader>>> CreateRawFileReaders(
+        const BinaryRow& partition, const 
std::vector<std::shared_ptr<DataFileMeta>>& data_files,
+        const std::shared_ptr<arrow::Schema>& read_schema,
+        const std::shared_ptr<Predicate>& predicate, DeletionVector::Factory 
dv_factory,
+        const std::optional<std::vector<Range>>& row_ranges,
+        const std::shared_ptr<DataFilePathFactory>& data_file_path_factory) 
const;
+
+ protected:
+    AbstractSplitRead(const std::shared_ptr<FileStorePathFactory>& 
path_factory,
+                      const std::shared_ptr<InternalReadContext>& context,
+                      std::unique_ptr<SchemaManager>&& schema_manager,
+                      const std::shared_ptr<MemoryPool>& memory_pool,
+                      const std::shared_ptr<Executor>& executor);
+
+    static std::unordered_map<std::string, DeletionFile> CreateDeletionFileMap(
+        const DataSplitImpl& data_split);
+
+    static std::unordered_map<std::string, DeletionFile> CreateDeletionFileMap(
+        const std::vector<std::shared_ptr<DataFileMeta>>& data_files,
+        const std::vector<std::optional<DeletionFile>>& deletion_files);
+
+    Result<std::unique_ptr<BatchReader>> ApplyPredicateFilterIfNeeded(
+        std::unique_ptr<BatchReader>&& reader, const 
std::shared_ptr<Predicate>& predicate) const;
+
+ protected:
+    // return nullptr if file is skipped by index or dv
+    virtual Result<std::unique_ptr<FileBatchReader>> 
ApplyIndexAndDvReaderIfNeeded(
+        std::unique_ptr<FileBatchReader>&& file_reader, const 
std::shared_ptr<DataFileMeta>& file,
+        const std::shared_ptr<arrow::Schema>& data_schema,
+        const std::shared_ptr<arrow::Schema>& read_schema,
+        const std::shared_ptr<Predicate>& predicate, DeletionVector::Factory 
dv_factory,
+        const std::optional<std::vector<Range>>& row_ranges,
+        const std::shared_ptr<DataFilePathFactory>& data_file_path_factory) 
const = 0;
+
+    // 1. project write cols to data schema
+    // 2. add partition fields (if write cols not contain)
+    // 3. add row tracking fields
+    static Result<std::vector<DataField>> 
ProjectFieldsForRowTrackingAndDataEvolution(
+        const std::shared_ptr<TableSchema>& data_schema,
+        const std::optional<std::vector<std::string>>& write_cols);
+
+ private:
+    Result<std::unique_ptr<ReaderBuilder>> PrepareReaderBuilder(
+        const std::string& format_identifier) const;
+
+    Result<std::unique_ptr<FileBatchReader>> CreateFileBatchReader(
+        const std::shared_ptr<DataFileMeta>& file_meta, const std::string& 
data_file_path,
+        const ReaderBuilder* reader_builder) const;
+
+    // return nullptr if data file is skipped by index or dv
+    Result<std::unique_ptr<FileBatchReader>> CreateFieldMappingReader(
+        const std::string& data_file_path, const 
std::shared_ptr<DataFileMeta>& file_meta,
+        const BinaryRow& partition, const ReaderBuilder* reader_builder,
+        const FieldMappingBuilder* field_mapping_builder, 
DeletionVector::Factory dv_factory,
+        const std::optional<std::vector<Range>>& row_ranges,
+        const std::shared_ptr<DataFilePathFactory>& data_file_path_factory) 
const;
+
+    static bool NeedCompleteRowTrackingFields(bool row_tracking_enabled,
+                                              const 
std::shared_ptr<arrow::Schema>& read_schema);
+
+ protected:
+    std::shared_ptr<MemoryPool> pool_;
+    std::shared_ptr<Executor> executor_;
+    std::shared_ptr<FileStorePathFactory> path_factory_;
+    CoreOptions options_;
+    // user recall schema
+    std::shared_ptr<arrow::Schema> raw_read_schema_;
+    std::shared_ptr<InternalReadContext> context_;
+    std::unique_ptr<SchemaManager> schema_manager_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/operation/abstract_split_read_test.cpp 
b/src/paimon/core/operation/abstract_split_read_test.cpp
new file mode 100644
index 0000000..6fdd7f2
--- /dev/null
+++ b/src/paimon/core/operation/abstract_split_read_test.cpp
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/abstract_split_read.h"
+
+#include <cstdint>
+#include <limits>
+
+#include "arrow/type_fwd.h"
+#include "gtest/gtest.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(AbstractSplitReadTest, TestNeedCompleteRowTrackingFields) {
+    std::vector<DataField> data_fields = {DataField(0, arrow::field("name", 
arrow::utf8())),
+                                          DataField(1, arrow::field("sex", 
arrow::utf8())),
+                                          DataField(2, arrow::field("age", 
arrow::int32())),
+                                          SpecialFields::RowId(), 
SpecialFields::SequenceNumber()};
+    auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(data_fields);
+    auto fields = arrow_schema->fields();
+
+    
ASSERT_TRUE(AbstractSplitRead::NeedCompleteRowTrackingFields(/*row_tracking_enabled=*/true,
+                                                                 
arrow::schema(fields)));
+    ASSERT_TRUE(AbstractSplitRead::NeedCompleteRowTrackingFields(
+        /*row_tracking_enabled=*/true, arrow::schema({fields[0], fields[3]})));
+    ASSERT_TRUE(AbstractSplitRead::NeedCompleteRowTrackingFields(
+        /*row_tracking_enabled=*/true, arrow::schema({fields[0], fields[4]})));
+    ASSERT_FALSE(AbstractSplitRead::NeedCompleteRowTrackingFields(
+        /*row_tracking_enabled=*/true, arrow::schema({fields[0], fields[1]})));
+    
ASSERT_FALSE(AbstractSplitRead::NeedCompleteRowTrackingFields(/*row_tracking_enabled=*/false,
+                                                                  
arrow::schema(fields)));
+}
+
+TEST(AbstractSplitReadTest, TestProjectFieldsForRowTrackingAndDataEvolution) {
+    {
+        // test no partition
+        std::vector<DataField> fields = {DataField(0, arrow::field("name", 
arrow::utf8())),
+                                         DataField(1, arrow::field("sex", 
arrow::utf8())),
+                                         DataField(2, arrow::field("age", 
arrow::int32()))};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, 
DataField::ConvertDataFieldsToArrowSchema(fields),
+                                /*partition_keys=*/{},
+                                /*primary_keys=*/{}, /*options=*/{}));
+
+        {
+            // test write_cols is std::nullopt
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+                                     table_schema, 
/*write_cols=*/std::nullopt));
+            std::vector<DataField> expected = fields;
+            expected.push_back(SpecialFields::RowId());
+            expected.push_back(SpecialFields::SequenceNumber());
+            ASSERT_EQ(result, expected);
+        }
+        {
+            // test with write_cols
+            std::vector<std::string> write_cols = {"name", "age"};
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+                                     table_schema, write_cols));
+            std::vector<DataField> expected = {fields[0], fields[2], 
SpecialFields::RowId(),
+                                               
SpecialFields::SequenceNumber()};
+            ASSERT_EQ(result, expected);
+        }
+        {
+            // test with empty write_cols
+            std::vector<std::string> write_cols = {};
+            
ASSERT_NOK_WITH_MSG(AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+                                    table_schema, write_cols),
+                                "write cols cannot be empty");
+        }
+    }
+    {
+        // test with partition
+        std::vector<DataField> fields = {DataField(0, arrow::field("name", 
arrow::utf8())),
+                                         DataField(1, arrow::field("ds", 
arrow::utf8())),
+                                         DataField(2, arrow::field("sex", 
arrow::utf8())),
+                                         DataField(3, arrow::field("age", 
arrow::int32()))};
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<TableSchema> table_schema,
+            TableSchema::Create(/*schema_id=*/0, 
DataField::ConvertDataFieldsToArrowSchema(fields),
+                                /*partition_keys=*/{"ds"},
+                                /*primary_keys=*/{}, /*options=*/{}));
+
+        {
+            // test write_cols is std::nullopt
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+                                     table_schema, 
/*write_cols=*/std::nullopt));
+            std::vector<DataField> expected = fields;
+            expected.push_back(SpecialFields::RowId());
+            expected.push_back(SpecialFields::SequenceNumber());
+            ASSERT_EQ(result, expected);
+        }
+        {
+            // test with write_cols and write_cols not contain partition fields
+            std::vector<std::string> write_cols = {"name", "age"};
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+                                     table_schema, write_cols));
+            std::vector<DataField> expected = {fields[0], fields[3], fields[1],
+                                               SpecialFields::RowId(),
+                                               
SpecialFields::SequenceNumber()};
+            ASSERT_EQ(result, expected);
+        }
+        {
+            // test with write_cols and write_cols contain partition fields
+            std::vector<std::string> write_cols = {"age", "name", "ds"};
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+                                     table_schema, write_cols));
+            std::vector<DataField> expected = {fields[3], fields[0], fields[1],
+                                               SpecialFields::RowId(),
+                                               
SpecialFields::SequenceNumber()};
+            ASSERT_EQ(result, expected);
+        }
+        {
+            // test with write_cols and write_cols contain row tracking fields
+            std::vector<std::string> write_cols = {
+                "age",
+                "name",
+                SpecialFields::RowId().Name(),
+                SpecialFields::SequenceNumber().Name(),
+            };
+            ASSERT_OK_AND_ASSIGN(auto result,
+                                 
AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+                                     table_schema, write_cols));
+            std::vector<DataField> expected = {fields[3], fields[0], fields[1],
+                                               SpecialFields::RowId(),
+                                               
SpecialFields::SequenceNumber()};
+            ASSERT_EQ(result, expected);
+        }
+        {
+            // test with empty write_cols
+            std::vector<std::string> write_cols = {};
+            
ASSERT_NOK_WITH_MSG(AbstractSplitRead::ProjectFieldsForRowTrackingAndDataEvolution(
+                                    table_schema, write_cols),
+                                "write cols cannot be empty");
+        }
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/operation/internal_read_context.cpp 
b/src/paimon/core/operation/internal_read_context.cpp
new file mode 100644
index 0000000..fb0f36b
--- /dev/null
+++ b/src/paimon/core/operation/internal_read_context.cpp
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/internal_read_context.h"
+
+#include <utility>
+
+#include "paimon/common/predicate/predicate_validator.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/core/schema/arrow_schema_validator.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+Result<std::unique_ptr<InternalReadContext>> InternalReadContext::Create(
+    const std::shared_ptr<ReadContext>& context, const 
std::shared_ptr<TableSchema>& table_schema,
+    const std::map<std::string, std::string>& options) {
+    PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options,
+                           CoreOptions::FromMap(options, 
context->GetSpecificFileSystem(),
+                                                
context->GetFileSystemSchemeToIdentifierMap()));
+    // prepare read schema
+    std::vector<DataField> read_data_fields;
+    if (!context->GetReadFieldIds().empty()) {
+        read_data_fields.reserve(context->GetReadFieldIds().size());
+        for (const auto& field_id : context->GetReadFieldIds()) {
+            // if enable row tracking or data evolution, check special fields
+            if (core_options.RowTrackingEnabled() && field_id == 
SpecialFields::RowId().Id()) {
+                read_data_fields.push_back(SpecialFields::RowId());
+                continue;
+            }
+            if ((core_options.RowTrackingEnabled() ||
+                 core_options.KeyValueSequenceNumberEnabled()) &&
+                field_id == SpecialFields::SequenceNumber().Id()) {
+                read_data_fields.push_back(SpecialFields::SequenceNumber());
+                continue;
+            }
+            if (field_id == SpecialFields::ValueKind().Id()) {
+                read_data_fields.push_back(SpecialFields::ValueKind());
+                continue;
+            }
+            if (core_options.DataEvolutionEnabled() &&
+                field_id == SpecialFields::IndexScore().Id()) {
+                read_data_fields.push_back(SpecialFields::IndexScore());
+                continue;
+            }
+            PAIMON_ASSIGN_OR_RAISE(DataField field, 
table_schema->GetField(field_id));
+            read_data_fields.push_back(field);
+        }
+    } else if (!context->GetReadSchema().empty()) {
+        read_data_fields.reserve(context->GetReadSchema().size());
+        for (const auto& name : context->GetReadSchema()) {
+            // if enable row tracking or data evolution, check special fields
+            if (core_options.RowTrackingEnabled() && name == 
SpecialFields::RowId().Name()) {
+                read_data_fields.push_back(SpecialFields::RowId());
+                continue;
+            }
+            if ((core_options.RowTrackingEnabled() ||
+                 core_options.KeyValueSequenceNumberEnabled()) &&
+                name == SpecialFields::SequenceNumber().Name()) {
+                read_data_fields.push_back(SpecialFields::SequenceNumber());
+                continue;
+            }
+            if (name == SpecialFields::ValueKind().Name()) {
+                read_data_fields.push_back(SpecialFields::ValueKind());
+                continue;
+            }
+            if (core_options.DataEvolutionEnabled() && name == 
SpecialFields::IndexScore().Name()) {
+                read_data_fields.push_back(SpecialFields::IndexScore());
+                continue;
+            }
+            PAIMON_ASSIGN_OR_RAISE(DataField field, 
table_schema->GetField(name));
+            read_data_fields.push_back(field);
+        }
+    } else {
+        // if field names not set, read all fields
+        read_data_fields = table_schema->Fields();
+    }
+    auto read_schema = 
DataField::ConvertDataFieldsToArrowSchema(read_data_fields);
+    // validate read schema to avoid redundant fields
+    
PAIMON_RETURN_NOT_OK(ArrowSchemaValidator::ValidateSchemaWithFieldId(*read_schema));
+    // validate predicate
+    if (context->GetPredicate()) {
+        PAIMON_RETURN_NOT_OK(PredicateValidator::ValidatePredicateWithSchema(
+            *read_schema, context->GetPredicate(), 
/*validate_field_idx=*/true));
+        PAIMON_RETURN_NOT_OK(
+            
PredicateValidator::ValidatePredicateWithLiterals(context->GetPredicate()));
+    }
+
+    return std::unique_ptr<InternalReadContext>(
+        new InternalReadContext(context, table_schema, read_schema, 
core_options));
+}
+
+InternalReadContext::InternalReadContext(const std::shared_ptr<ReadContext>& 
read_context,
+                                         const std::shared_ptr<TableSchema>& 
table_schema,
+                                         const std::shared_ptr<arrow::Schema>& 
read_schema,
+                                         const CoreOptions& options)
+    : read_context_(read_context),
+      table_schema_(table_schema),
+      read_schema_(read_schema),
+      options_(options) {}
+
+}  // namespace paimon
diff --git a/src/paimon/core/operation/internal_read_context.h 
b/src/paimon/core/operation/internal_read_context.h
new file mode 100644
index 0000000..c199026
--- /dev/null
+++ b/src/paimon/core/operation/internal_read_context.h
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/core/core_options.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/read_context.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+class Executor;
+class MemoryPool;
+class Predicate;
+
+// internal read context, contains ReadContext, CoreOptions and TableSchema
+class InternalReadContext {
+ public:
+    static Result<std::unique_ptr<InternalReadContext>> Create(
+        const std::shared_ptr<ReadContext>& read_context,
+        const std::shared_ptr<TableSchema>& table_schema,
+        const std::map<std::string, std::string>& options);
+
+    const CoreOptions& GetCoreOptions() const {
+        return options_;
+    }
+    std::shared_ptr<arrow::Schema> GetReadSchema() const {
+        return read_schema_;
+    }
+    const std::shared_ptr<TableSchema>& GetTableSchema() const {
+        return table_schema_;
+    }
+    const std::string& GetPath() const {
+        return read_context_->GetPath();
+    }
+    const std::vector<std::string>& GetPartitionKeys() const {
+        return table_schema_->PartitionKeys();
+    }
+    const std::vector<std::string>& GetPrimaryKeys() const {
+        return table_schema_->PrimaryKeys();
+    }
+    const std::shared_ptr<Predicate>& GetPredicate() const {
+        return read_context_->GetPredicate();
+    }
+    bool EnablePredicateFilter() const {
+        return read_context_->EnablePredicateFilter();
+    }
+    bool EnablePrefetch() const {
+        return read_context_->EnablePrefetch();
+    }
+    uint32_t GetPrefetchBatchCount() const {
+        return read_context_->GetPrefetchBatchCount();
+    }
+    uint32_t GetPrefetchMaxParallelNum() const {
+        return read_context_->GetPrefetchMaxParallelNum();
+    }
+    bool EnableMultiThreadRowToBatch() const {
+        return read_context_->EnableMultiThreadRowToBatch();
+    }
+    uint32_t GetRowToBatchThreadNumber() const {
+        return read_context_->GetRowToBatchThreadNumber();
+    }
+    std::shared_ptr<MemoryPool> GetMemoryPool() const {
+        return read_context_->GetMemoryPool();
+    }
+    std::shared_ptr<Executor> GetExecutor() const {
+        return read_context_->GetExecutor();
+    }
+
+    PrefetchCacheMode GetPrefetchCacheMode() const {
+        return read_context_->GetPrefetchCacheMode();
+    }
+
+    const CacheConfig& GetCacheConfig() const {
+        return read_context_->GetCacheConfig();
+    }
+
+ private:
+    InternalReadContext(const std::shared_ptr<ReadContext>& read_context,
+                        const std::shared_ptr<TableSchema>& table_schema,
+                        const std::shared_ptr<arrow::Schema>& read_schema,
+                        const CoreOptions& options);
+
+    std::shared_ptr<ReadContext> read_context_;
+    std::shared_ptr<TableSchema> table_schema_;
+    std::shared_ptr<arrow::Schema> read_schema_;
+    CoreOptions options_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/core/operation/internal_read_context_test.cpp 
b/src/paimon/core/operation/internal_read_context_test.cpp
new file mode 100644
index 0000000..4d1467b
--- /dev/null
+++ b/src/paimon/core/operation/internal_read_context_test.cpp
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/internal_read_context.h"
+
+#include <utility>
+
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/core/schema/schema_manager.h"
+#include "paimon/defs.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(InternalReadContext, TestReadWithUnspecifiedSchema) {
+    // no read schema is specified, read all fields
+    std::string path = paimon::test::GetDataDir() + 
"/orc/append_09.db/append_09";
+    ReadContextBuilder context_builder(path);
+    ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+    SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), 
read_context->GetPath());
+    ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+    ASSERT_OK_AND_ASSIGN(auto internal_context,
+                         InternalReadContext::Create(std::move(read_context), 
table_schema,
+                                                     table_schema->Options()));
+    std::vector<DataField> read_fields = {DataField(0, arrow::field("f0", 
arrow::utf8())),
+                                          DataField(1, arrow::field("f1", 
arrow::int32())),
+                                          DataField(2, arrow::field("f2", 
arrow::int32())),
+                                          DataField(3, arrow::field("f3", 
arrow::float64()))};
+    auto expected_schema = 
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+    ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+}
+
+TEST(InternalReadContext, TestReadWithSpecifiedSchema) {
+    std::string path = paimon::test::GetDataDir() + 
"/orc/append_09.db/append_09";
+    ReadContextBuilder context_builder(path);
+    context_builder.SetReadSchema({"f3", "f0"});
+    ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+    SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), 
read_context->GetPath());
+    ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+    ASSERT_OK_AND_ASSIGN(auto internal_context,
+                         InternalReadContext::Create(std::move(read_context), 
table_schema,
+                                                     table_schema->Options()));
+    std::vector<DataField> read_fields = {DataField(3, arrow::field("f3", 
arrow::float64())),
+                                          DataField(0, arrow::field("f0", 
arrow::utf8()))};
+    auto expected_schema = 
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+    ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+}
+
+TEST(InternalReadContext, TestReadWithSpecifiedFieldId) {
+    std::string path = paimon::test::GetDataDir() + 
"/orc/append_09.db/append_09";
+    ReadContextBuilder context_builder(path);
+    context_builder.SetReadFieldIds({3, 0});
+    ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+    SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), 
read_context->GetPath());
+    ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+    ASSERT_OK_AND_ASSIGN(auto internal_context,
+                         InternalReadContext::Create(std::move(read_context), 
table_schema,
+                                                     table_schema->Options()));
+    std::vector<DataField> read_fields = {DataField(3, arrow::field("f3", 
arrow::float64())),
+                                          DataField(0, arrow::field("f0", 
arrow::utf8()))};
+    auto expected_schema = 
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+    ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+}
+
+TEST(InternalReadContext, TestReadWithSpecifiedFieldIdAndSchema) {
+    std::string path = paimon::test::GetDataDir() + 
"/orc/append_09.db/append_09";
+    ReadContextBuilder context_builder(path);
+    // read schema is specified, read fields in schema
+    // will use field ids instead of field names.
+    context_builder.SetReadSchema({"f0"});
+    context_builder.SetReadFieldIds({3, 0});
+    ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+    SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), 
read_context->GetPath());
+    ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+    ASSERT_OK_AND_ASSIGN(auto internal_context,
+                         InternalReadContext::Create(std::move(read_context), 
table_schema,
+                                                     table_schema->Options()));
+    std::vector<DataField> read_fields = {DataField(3, arrow::field("f3", 
arrow::float64())),
+                                          DataField(0, arrow::field("f0", 
arrow::utf8()))};
+    auto expected_schema = 
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+    ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+}
+
+TEST(InternalReadContext, TestReadWithRowTrackingAndScoreFields) {
+    {
+        // test simple
+        std::string path = paimon::test::GetDataDir() + 
"/orc/append_09.db/append_09";
+        ReadContextBuilder context_builder(path);
+        context_builder.SetReadSchema({"f3", "f0", "_ROW_ID", 
"_SEQUENCE_NUMBER", "_INDEX_SCORE"});
+        ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+        SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), 
read_context->GetPath());
+        ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+        auto new_options = table_schema->Options();
+        new_options[Options::ROW_TRACKING_ENABLED] = "true";
+        new_options[Options::DATA_EVOLUTION_ENABLED] = "true";
+        ASSERT_OK_AND_ASSIGN(
+            auto internal_context,
+            InternalReadContext::Create(std::move(read_context), table_schema, 
new_options));
+        std::vector<DataField> read_fields = {
+            DataField(3, arrow::field("f3", arrow::float64())),
+            DataField(0, arrow::field("f0", arrow::utf8())), 
SpecialFields::RowId(),
+            SpecialFields::SequenceNumber(), SpecialFields::IndexScore()};
+        auto expected_schema = 
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+        
ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+    }
+    {
+        // test invalid case: disable row tracking while read row tracking 
fields
+        std::string path = paimon::test::GetDataDir() + 
"/orc/append_09.db/append_09";
+        ReadContextBuilder context_builder(path);
+        context_builder.SetReadSchema({"f3", "f0", "_ROW_ID", 
"_SEQUENCE_NUMBER"});
+        ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+        SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), 
read_context->GetPath());
+        ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+        
ASSERT_NOK_WITH_MSG(InternalReadContext::Create(std::move(read_context), 
table_schema,
+                                                        
table_schema->Options()),
+                            "Get field _ROW_ID failed: not exist in table 
schema");
+    }
+    {
+        // test invalid case: disable data evolution while read score fields
+        std::string path = paimon::test::GetDataDir() + 
"/orc/append_09.db/append_09";
+        ReadContextBuilder context_builder(path);
+        context_builder.SetReadSchema({"f3", "f0", "_INDEX_SCORE"});
+        ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+        SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), 
read_context->GetPath());
+        ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+        
ASSERT_NOK_WITH_MSG(InternalReadContext::Create(std::move(read_context), 
table_schema,
+                                                        
table_schema->Options()),
+                            "Get field _INDEX_SCORE failed: not exist in table 
schema");
+    }
+}
+
+TEST(InternalReadContext, TestReadWithValueKindField) {
+    std::string path = paimon::test::GetDataDir() + 
"/orc/append_09.db/append_09";
+    ReadContextBuilder context_builder(path);
+    context_builder.SetReadSchema({"f3", "_VALUE_KIND", "f0"});
+    ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+    SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), 
read_context->GetPath());
+    ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+    ASSERT_OK_AND_ASSIGN(auto internal_context,
+                         InternalReadContext::Create(std::move(read_context), 
table_schema,
+                                                     table_schema->Options()));
+    std::vector<DataField> read_fields = {DataField(3, arrow::field("f3", 
arrow::float64())),
+                                          SpecialFields::ValueKind(),
+                                          DataField(0, arrow::field("f0", 
arrow::utf8()))};
+    auto expected_schema = 
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+    ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+}
+
+TEST(InternalReadContext, TestReadWithFieldIdsAndSpecialFields) {
+    {
+        // test simple
+        std::string path = paimon::test::GetDataDir() + 
"/orc/append_09.db/append_09";
+        ReadContextBuilder context_builder(path);
+        // here we use field ids instead of field names, and specify special 
ids for row id,
+        // sequence number and index score.
+        context_builder.SetReadFieldIds({3, 0, SpecialFieldIds::ROW_ID,
+                                         SpecialFieldIds::SEQUENCE_NUMBER,
+                                         SpecialFieldIds::INDEX_SCORE});
+        ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
+        SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), 
read_context->GetPath());
+        ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
+        auto new_options = table_schema->Options();
+        new_options[Options::ROW_TRACKING_ENABLED] = "true";
+        new_options[Options::DATA_EVOLUTION_ENABLED] = "true";
+        ASSERT_OK_AND_ASSIGN(
+            auto internal_context,
+            InternalReadContext::Create(std::move(read_context), table_schema, 
new_options));
+        std::vector<DataField> read_fields = {
+            DataField(3, arrow::field("f3", arrow::float64())),
+            DataField(0, arrow::field("f0", arrow::utf8())), 
SpecialFields::RowId(),
+            SpecialFields::SequenceNumber(), SpecialFields::IndexScore()};
+        auto expected_schema = 
DataField::ConvertDataFieldsToArrowSchema(read_fields);
+        
ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/operation/read_context.cpp 
b/src/paimon/core/operation/read_context.cpp
new file mode 100644
index 0000000..81fe236
--- /dev/null
+++ b/src/paimon/core/operation/read_context.cpp
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/read_context.h"
+
+#include <utility>
+
+#include "paimon/common/utils/path_util.h"
+#include "paimon/core/utils/branch_manager.h"
+#include "paimon/executor.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class Predicate;
+
+ReadContext::ReadContext(
+    const std::string& path, const std::string& branch, const 
std::vector<std::string>& read_schema,
+    const std::vector<int32_t>& read_field_ids, const 
std::shared_ptr<Predicate>& predicate,
+    bool enable_predicate_filter, bool enable_prefetch, uint32_t 
prefetch_batch_count,
+    uint32_t prefetch_max_parallel_num, bool enable_multi_thread_row_to_batch,
+    uint32_t row_to_batch_thread_number, const std::optional<std::string>& 
table_schema,
+    const std::shared_ptr<MemoryPool>& memory_pool, const 
std::shared_ptr<Executor>& executor,
+    const std::shared_ptr<FileSystem>& specific_file_system,
+    const std::map<std::string, std::string>& fs_scheme_to_identifier_map,
+    const std::map<std::string, std::string>& options, PrefetchCacheMode 
prefetch_cache_mode,
+    const CacheConfig& cache_config)
+    : path_(path),
+      branch_(branch),
+      read_schema_(read_schema),
+      read_field_ids_(read_field_ids),
+      predicate_(predicate),
+      enable_predicate_filter_(enable_predicate_filter),
+      enable_prefetch_(enable_prefetch),
+      prefetch_batch_count_(prefetch_batch_count),
+      prefetch_max_parallel_num_(prefetch_max_parallel_num),
+      enable_multi_thread_row_to_batch_(enable_multi_thread_row_to_batch),
+      row_to_batch_thread_number_(row_to_batch_thread_number),
+      table_schema_(table_schema),
+      memory_pool_(memory_pool),
+      executor_(executor),
+      specific_file_system_(specific_file_system),
+      fs_scheme_to_identifier_map_(fs_scheme_to_identifier_map),
+      options_(options),
+      prefetch_cache_mode_(prefetch_cache_mode),
+      cache_config_(cache_config) {}
+
+ReadContext::~ReadContext() = default;
+
+class ReadContextBuilder::Impl {
+ public:
+    friend class ReadContextBuilder;
+    void Reset() {
+        branch_ = BranchManager::DEFAULT_MAIN_BRANCH;
+        read_field_names_.clear();
+        read_field_ids_.clear();
+        fs_scheme_to_identifier_map_.clear();
+        options_.clear();
+        predicate_.reset();
+        enable_predicate_filter_ = false;
+        enable_prefetch_ = false;
+        prefetch_cache_mode_ = PrefetchCacheMode::ALWAYS;
+        prefetch_batch_count_ = 600;
+        prefetch_max_parallel_num_ = 3;
+        enable_multi_thread_row_to_batch_ = false;
+        row_to_batch_thread_number_ = 1;
+        table_schema_ = std::nullopt;
+        memory_pool_ = GetDefaultPool();
+        executor_.reset();
+        specific_file_system_.reset();
+        cache_config_ = CacheConfig();
+    }
+
+ private:
+    std::string path_;
+    std::string branch_ = BranchManager::DEFAULT_MAIN_BRANCH;
+    std::vector<std::string> read_field_names_;
+    std::vector<int32_t> read_field_ids_;
+    std::map<std::string, std::string> fs_scheme_to_identifier_map_;
+    std::map<std::string, std::string> options_;
+    std::shared_ptr<Predicate> predicate_;
+    bool enable_predicate_filter_ = false;
+    bool enable_prefetch_ = false;
+    uint32_t prefetch_batch_count_ = 600;
+    uint32_t prefetch_max_parallel_num_ = 3;
+    bool enable_multi_thread_row_to_batch_ = false;
+    uint32_t row_to_batch_thread_number_ = 1;
+    std::optional<std::string> table_schema_;
+    std::shared_ptr<MemoryPool> memory_pool_ = GetDefaultPool();
+    std::shared_ptr<Executor> executor_;
+    std::shared_ptr<FileSystem> specific_file_system_;
+    PrefetchCacheMode prefetch_cache_mode_ = PrefetchCacheMode::ALWAYS;
+    CacheConfig cache_config_;
+};
+
+ReadContextBuilder::ReadContextBuilder(const std::string& path)
+    : impl_(std::make_unique<ReadContextBuilder::Impl>()) {
+    impl_->path_ = path;
+}
+
+ReadContextBuilder::~ReadContextBuilder() = default;
+
+ReadContextBuilder::ReadContextBuilder(ReadContextBuilder&&) noexcept = 
default;
+ReadContextBuilder& ReadContextBuilder::operator=(ReadContextBuilder&&) 
noexcept = default;
+
+ReadContextBuilder& ReadContextBuilder::AddOption(const std::string& key,
+                                                  const std::string& value) {
+    impl_->options_[key] = value;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetOptions(const std::map<std::string, 
std::string>& opts) {
+    impl_->options_ = opts;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetReadSchema(
+    const std::vector<std::string>& read_field_names) {
+    impl_->read_field_names_ = read_field_names;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetReadFieldIds(
+    const std::vector<int32_t>& read_field_ids) {
+    impl_->read_field_ids_ = read_field_ids;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetPredicate(const 
std::shared_ptr<Predicate>& predicate) {
+    impl_->predicate_ = predicate;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::EnablePredicateFilter(bool enabled) {
+    impl_->enable_predicate_filter_ = enabled;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::EnablePrefetch(bool enabled) {
+    impl_->enable_prefetch_ = enabled;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetPrefetchBatchCount(uint32_t 
batch_count) {
+    impl_->prefetch_batch_count_ = batch_count;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetPrefetchMaxParallelNum(uint32_t 
max_parallel_num) {
+    impl_->prefetch_max_parallel_num_ = max_parallel_num;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::EnableMultiThreadRowToBatch(bool 
enabled) {
+    impl_->enable_multi_thread_row_to_batch_ = enabled;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetRowToBatchThreadNumber(uint32_t 
thread_number) {
+    impl_->row_to_batch_thread_number_ = thread_number;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithMemoryPool(
+    const std::shared_ptr<MemoryPool>& memory_pool) {
+    impl_->memory_pool_ = memory_pool;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithExecutor(const 
std::shared_ptr<Executor>& executor) {
+    impl_->executor_ = executor;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetTableSchema(const std::string& 
table_schema) {
+    impl_->table_schema_ = table_schema;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithBranch(const std::string& branch) {
+    impl_->branch_ = branch;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithFileSystemSchemeToIdentifierMap(
+    const std::map<std::string, std::string>& fs_scheme_to_identifier_map) {
+    impl_->fs_scheme_to_identifier_map_ = fs_scheme_to_identifier_map;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithFileSystem(
+    const std::shared_ptr<FileSystem>& file_system) {
+    impl_->specific_file_system_ = file_system;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::SetPrefetchCacheMode(PrefetchCacheMode 
mode) {
+    impl_->prefetch_cache_mode_ = mode;
+    return *this;
+}
+
+ReadContextBuilder& ReadContextBuilder::WithCacheConfig(const CacheConfig& 
cache_config) {
+    impl_->cache_config_ = cache_config;
+    return *this;
+}
+
+Result<std::unique_ptr<ReadContext>> ReadContextBuilder::Finish() {
+    PAIMON_ASSIGN_OR_RAISE(impl_->path_, 
PathUtil::NormalizePath(impl_->path_));
+    if (impl_->path_.empty()) {
+        return Status::Invalid("cannot read with empty table path");
+    }
+    if (impl_->enable_prefetch_ && impl_->prefetch_batch_count_ <= 0) {
+        return Status::Invalid("prefetch batch count should be greater than 
0");
+    }
+    if (impl_->enable_prefetch_ &&
+        impl_->prefetch_batch_count_ < impl_->prefetch_max_parallel_num_) {
+        return Status::Invalid(
+            "prefetch batch count should be greater than or equal to prefetch 
max parallel num");
+    }
+    if (!impl_->executor_) {
+        // If the user do not set executor, create default executor by 
prefetch batch count
+        uint32_t thread_count = impl_->enable_prefetch_ ? 
impl_->prefetch_max_parallel_num_ : 1;
+        impl_->executor_ = CreateDefaultExecutor(thread_count);
+    }
+
+    if (impl_->enable_multi_thread_row_to_batch_ && 
impl_->row_to_batch_thread_number_ <= 0) {
+        return Status::Invalid("row to batch thread number should be greater 
than 0");
+    }
+    auto ctx = std::make_unique<ReadContext>(
+        impl_->path_, impl_->branch_, impl_->read_field_names_, 
impl_->read_field_ids_,
+        impl_->predicate_, impl_->enable_predicate_filter_, 
impl_->enable_prefetch_,
+        impl_->prefetch_batch_count_, impl_->prefetch_max_parallel_num_,
+        impl_->enable_multi_thread_row_to_batch_, 
impl_->row_to_batch_thread_number_,
+        impl_->table_schema_, impl_->memory_pool_, impl_->executor_, 
impl_->specific_file_system_,
+        impl_->fs_scheme_to_identifier_map_, impl_->options_, 
impl_->prefetch_cache_mode_,
+        impl_->cache_config_);
+    impl_->Reset();
+    return ctx;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/core/operation/read_context_test.cpp 
b/src/paimon/core/operation/read_context_test.cpp
new file mode 100644
index 0000000..dc79b69
--- /dev/null
+++ b/src/paimon/core/operation/read_context_test.cpp
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/read_context.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/defs.h"
+#include "paimon/executor.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_system.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(ReadContextTest, TestDefaultValue) {
+    ReadContextBuilder builder("table_root_path");
+    ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish());
+    ASSERT_EQ(ctx->GetPath(), "table_root_path");
+    ASSERT_TRUE(ctx->GetMemoryPool());
+    ASSERT_TRUE(ctx->GetExecutor());
+    ASSERT_TRUE(ctx->GetReadSchema().empty());
+    ASSERT_TRUE(ctx->GetReadFieldIds().empty());
+    ASSERT_TRUE(ctx->GetOptions().empty());
+    ASSERT_FALSE(ctx->GetPredicate());
+    ASSERT_FALSE(ctx->EnablePredicateFilter());
+    ASSERT_FALSE(ctx->EnablePrefetch());
+    ASSERT_EQ(PrefetchCacheMode::ALWAYS, ctx->GetPrefetchCacheMode());
+    ASSERT_EQ(600, ctx->GetPrefetchBatchCount());
+    ASSERT_EQ(3, ctx->GetPrefetchMaxParallelNum());
+    ASSERT_FALSE(ctx->EnableMultiThreadRowToBatch());
+    ASSERT_EQ(1, ctx->GetRowToBatchThreadNumber());
+    ASSERT_EQ("main", ctx->GetBranch());
+    ASSERT_TRUE(ctx->GetFileSystemSchemeToIdentifierMap().empty());
+    ASSERT_FALSE(ctx->GetSpecificFileSystem());
+}
+
+TEST(ReadContextTest, TestSetContent) {
+    ReadContextBuilder builder("table_root_path");
+    std::shared_ptr<MemoryPool> memory_pool = GetDefaultPool();
+    std::shared_ptr<Executor> executor = CreateDefaultExecutor();
+    CacheConfig cache_config(/*buffer_size_limit=*/1024, 
/*range_size_limit=*/512,
+                             /*hole_size_limit=*/128, 
/*pre_buffer_limit=*/2048);
+
+    builder.AddOption("key", "value");
+    builder.SetReadSchema({"f1", "f2"});
+    builder.SetReadFieldIds({0, 1});
+    auto predicate =
+        PredicateBuilder::IsNull(/*field_index=*/0, /*field_name=*/"f1", 
FieldType::INT);
+    builder.SetPredicate(predicate);
+    builder.EnablePredicateFilter(true);
+    builder.EnablePrefetch(true);
+    builder.SetPrefetchCacheMode(PrefetchCacheMode::NEVER);
+    builder.SetPrefetchBatchCount(1200);
+    builder.SetPrefetchMaxParallelNum(6);
+    builder.EnableMultiThreadRowToBatch(true);
+    builder.SetRowToBatchThreadNumber(9);
+    builder.WithMemoryPool(memory_pool);
+    builder.WithExecutor(executor);
+    builder.SetTableSchema("table-schema-json");
+    builder.WithBranch("rt");
+    builder.WithCacheConfig(cache_config);
+    builder.WithFileSystemSchemeToIdentifierMap({{"file", "local"}});
+    auto fs = std::make_shared<MockFileSystem>();
+    builder.WithFileSystem(fs);
+    ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish());
+
+    // test result
+    ASSERT_EQ(ctx->GetPath(), "table_root_path");
+    ASSERT_TRUE(ctx->GetMemoryPool());
+    ASSERT_TRUE(ctx->GetExecutor());
+    ASSERT_EQ(ctx->GetReadSchema(), std::vector<std::string>({"f1", "f2"}));
+    ASSERT_EQ(ctx->GetReadFieldIds(), std::vector<int32_t>({0, 1}));
+    ASSERT_EQ(*predicate, *(ctx->GetPredicate()));
+    ASSERT_TRUE(ctx->EnablePredicateFilter());
+    ASSERT_TRUE(ctx->EnablePrefetch());
+    ASSERT_EQ(PrefetchCacheMode::NEVER, ctx->GetPrefetchCacheMode());
+    ASSERT_EQ(1200, ctx->GetPrefetchBatchCount());
+    ASSERT_EQ(6, ctx->GetPrefetchMaxParallelNum());
+    ASSERT_TRUE(ctx->EnableMultiThreadRowToBatch());
+    ASSERT_EQ(9, ctx->GetRowToBatchThreadNumber());
+    ASSERT_EQ(memory_pool, ctx->GetMemoryPool());
+    ASSERT_EQ(executor, ctx->GetExecutor());
+    ASSERT_TRUE(ctx->GetSpecificTableSchema().has_value());
+    ASSERT_EQ("table-schema-json", ctx->GetSpecificTableSchema().value());
+    ASSERT_EQ("rt", ctx->GetBranch());
+    ASSERT_EQ(1024U, ctx->GetCacheConfig().GetBufferSizeLimit());
+    ASSERT_EQ(512U, ctx->GetCacheConfig().GetRangeSizeLimit());
+    ASSERT_EQ(128U, ctx->GetCacheConfig().GetHoleSizeLimit());
+    ASSERT_EQ(2048U, ctx->GetCacheConfig().GetPreBufferLimit());
+    std::map<std::string, std::string> expected_fs_map = {{"file", "local"}};
+    ASSERT_EQ(expected_fs_map, ctx->GetFileSystemSchemeToIdentifierMap());
+    std::map<std::string, std::string> expected_options = {{"key", "value"}};
+    ASSERT_EQ(expected_options, ctx->GetOptions());
+    ASSERT_EQ(ctx->GetSpecificFileSystem(), fs);
+}
+
+TEST(ReadContextTest, TestSetOptionsOverridesAddedOptions) {
+    ReadContextBuilder builder("table_root_path");
+    builder.AddOption("old", "value");
+    builder.SetOptions({{"key1", "value1"}, {"key2", "value2"}});
+
+    ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish());
+
+    std::map<std::string, std::string> expected_options = {{"key1", "value1"}, 
{"key2", "value2"}};
+    ASSERT_EQ(expected_options, ctx->GetOptions());
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/core/operation/split_read.h 
b/src/paimon/core/operation/split_read.h
new file mode 100644
index 0000000..ffde90a
--- /dev/null
+++ b/src/paimon/core/operation/split_read.h
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "paimon/executor.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/read_context.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/table/source/data_split.h"
+
+namespace paimon {
+/// Given a DataSplit or a list of DataSplit, generate a reader for batch 
reading.
+class SplitRead {
+ public:
+    virtual ~SplitRead() = default;
+
+    virtual Result<std::unique_ptr<BatchReader>> CreateReader(
+        const std::shared_ptr<Split>& split) = 0;
+
+    virtual Result<bool> Match(const std::shared_ptr<Split>& split,
+                               bool force_keep_delete) const = 0;
+};
+
+}  // namespace paimon

Reply via email to