This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-python.git


The following commit(s) were added to refs/heads/main by this push:
     new b4dd596  #49 Implement Python native read with PyArrow (#50)
b4dd596 is described below

commit b4dd596dadafe7c4e323d950433714a897045170
Author: ChengHui Chen <[email protected]>
AuthorDate: Thu May 22 18:28:48 2025 +0800

    #49 Implement Python native read with PyArrow (#50)
---
 .gitignore                                         |   1 +
 dev/dev-requirements.txt                           |   2 +
 ... => paimon-python-java-bridge-1.0-SNAPSHOT.jar} | Bin 43384210 -> 44549537 
bytes
 paimon-python-java-bridge/pom.xml                  |   6 +-
 .../java/org/apache/paimon/python/BytesWriter.java |   4 +-
 .../org/apache/paimon/python/InvocationUtil.java   |  22 ++
 pypaimon/py4j/java_implementation.py               | 123 +++++++-
 pypaimon/py4j/util/constants.py                    |   1 +
 .../pynative/__init__.py                           |  13 -
 .../pynative/common/__init__.py                    |  13 -
 .../pynative/common/exception.py                   |  15 +-
 pypaimon/pynative/common/predicate.py              |  83 ++++++
 .../pynative/common/row/__init__.py                |  13 -
 pypaimon/pynative/common/row/columnar_row.py       |  63 +++++
 pypaimon/pynative/common/row/internal_row.py       |  72 +++++
 .../pynative/common/row/key_value.py               |  36 ++-
 pypaimon/pynative/common/row/offset_row.py         |  59 ++++
 pypaimon/pynative/common/row/row_kind.py           |  57 ++++
 .../pynative/reader/__init__.py                    |  13 -
 pypaimon/pynative/reader/avro_format_reader.py     |  83 ++++++
 pypaimon/pynative/reader/concat_record_reader.py   |  57 ++++
 .../pynative/reader/core/__init__.py               |  13 -
 .../pynative/reader/core/columnar_row_iterator.py  |  61 ++++
 .../reader/core/file_record_iterator.py}           |  34 ++-
 .../pynative/reader/core/file_record_reader.py     |  31 ++-
 .../pynative/reader/core/record_iterator.py        |  34 ++-
 .../pynative/reader/core/record_reader.py          |  36 ++-
 .../reader/data_file_record_reader.py}             |  40 ++-
 pypaimon/pynative/reader/drop_delete_reader.py     |  62 +++++
 .../pynative/reader/empty_record_reader.py         |  31 ++-
 pypaimon/pynative/reader/filter_record_reader.py   |  64 +++++
 .../pynative/reader/key_value_unwrap_reader.py     |  74 +++++
 pypaimon/pynative/reader/key_value_wrap_reader.py  |  97 +++++++
 pypaimon/pynative/reader/pyarrow_dataset_reader.py |  78 ++++++
 pypaimon/pynative/reader/sort_merge_reader.py      | 264 ++++++++++++++++++
 .../constants.py => pynative/tests/__init__.py}    |  44 ++-
 pypaimon/pynative/tests/test_pynative_reader.py    | 308 +++++++++++++++++++++
 .../pynative/util/__init__.py                      |  13 -
 pypaimon/pynative/util/predicate_converter.py      |  77 ++++++
 pypaimon/pynative/util/predicate_utils.py          |  56 ++++
 pypaimon/pynative/util/reader_convert_func.py      | 200 +++++++++++++
 pypaimon/pynative/util/reader_converter.py         |  89 ++++++
 .../pynative/writer/__init__.py                    |  13 -
 pyproject.toml                                     |   8 +
 setup.py                                           |   6 +
 45 files changed, 2249 insertions(+), 220 deletions(-)

diff --git a/.gitignore b/.gitignore
index e845db3..339e2c2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
 .idea/
 !.idea/vcs.xml
+.vscode/
 
 # paimon-python-java-bridge
 target
diff --git a/dev/dev-requirements.txt b/dev/dev-requirements.txt
index 4ed964e..328c7e6 100755
--- a/dev/dev-requirements.txt
+++ b/dev/dev-requirements.txt
@@ -21,6 +21,8 @@ setuptools>=18.0
 wheel
 py4j==0.10.9.7
 pyarrow>=5.0.0
+fastavro>=1.9.0
+zstandard>=0.23.0
 pandas>=1.3.0
 numpy>=1.22.4
 python-dateutil>=2.8.0,<3
diff --git a/dev/test_deps/paimon-python-java-bridge-0.9-SNAPSHOT.jar 
b/dev/test_deps/paimon-python-java-bridge-1.0-SNAPSHOT.jar
similarity index 92%
rename from dev/test_deps/paimon-python-java-bridge-0.9-SNAPSHOT.jar
rename to dev/test_deps/paimon-python-java-bridge-1.0-SNAPSHOT.jar
index dfbe9d7..c0df8e3 100644
Binary files a/dev/test_deps/paimon-python-java-bridge-0.9-SNAPSHOT.jar and 
b/dev/test_deps/paimon-python-java-bridge-1.0-SNAPSHOT.jar differ
diff --git a/paimon-python-java-bridge/pom.xml 
b/paimon-python-java-bridge/pom.xml
index dd7f7f2..5abf40a 100644
--- a/paimon-python-java-bridge/pom.xml
+++ b/paimon-python-java-bridge/pom.xml
@@ -23,14 +23,14 @@
 
     <groupId>org.apache.paimon</groupId>
     <artifactId>paimon-python-java-bridge</artifactId>
-    <version>0.9-SNAPSHOT</version>
+    <version>1.0-SNAPSHOT</version>
     <name>Paimon : Python-Java Bridge</name>
 
     <packaging>jar</packaging>
     <inceptionYear>2024</inceptionYear>
 
     <properties>
-        <paimon.version>0.9.0</paimon.version>
+        <paimon.version>1.0.0</paimon.version>
         <py4j.version>0.10.9.7</py4j.version>
         <slf4j.version>1.7.32</slf4j.version>
         <log4j.version>2.17.1</log4j.version>
@@ -38,7 +38,7 @@
         <spotless.delimiter>package</spotless.delimiter>
         <arrow.version>14.0.0</arrow.version>
         <target.java.version>1.8</target.java.version>
-        <paimon.ci.tools.version>0.9.0</paimon.ci.tools.version>
+        <paimon.ci.tools.version>1.0.0</paimon.ci.tools.version>
     </properties>
 
     <dependencies>
diff --git 
a/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java
 
b/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java
index f2ca4e1..fe53597 100644
--- 
a/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java
+++ 
b/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/BytesWriter.java
@@ -44,11 +44,11 @@ public class BytesWriter {
 
     public BytesWriter(TableWrite tableWrite, RowType rowType) {
         this.tableWrite = tableWrite;
-        this.arrowBatchReader = new ArrowBatchReader(rowType);
+        this.arrowBatchReader = new ArrowBatchReader(rowType, true);
         this.allocator = new RootAllocator();
         arrowFields =
                 rowType.getFields().stream()
-                        .map(f -> ArrowUtils.toArrowField(f.name(), f.type()))
+                        .map(f -> ArrowUtils.toArrowField(f.name(), f.id(), 
f.type(), 0))
                         .collect(Collectors.toList());
     }
 
diff --git 
a/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java
 
b/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java
index 61d322a..5833771 100644
--- 
a/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java
+++ 
b/paimon-python-java-bridge/src/main/java/org/apache/paimon/python/InvocationUtil.java
@@ -18,13 +18,22 @@
 
 package org.apache.paimon.python;
 
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.reader.ReaderSupplier;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.TableWrite;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.RowType;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Call some methods in Python directly will raise py4j.Py4JException: Method 
method([]) does not
  * exist. This util is a workaround.
@@ -47,4 +56,17 @@ public class InvocationUtil {
     public static BytesWriter createBytesWriter(TableWrite tableWrite, RowType 
rowType) {
         return new BytesWriter(tableWrite, rowType);
     }
+
+    /**
+     * To resolve py4j bug: 'py4j.Py4JException: Method createReader([class 
java.util.ArrayList])
+     * does not exist'
+     */
+    public static RecordReader<InternalRow> createReader(TableRead tableRead, 
List<Split> splits)
+            throws IOException {
+        List<ReaderSupplier<InternalRow>> readers = new ArrayList();
+        for (Split split : splits) {
+            readers.add(() -> tableRead.createReader(split));
+        }
+        return ConcatRecordReader.create(readers);
+    }
 }
diff --git a/pypaimon/py4j/java_implementation.py 
b/pypaimon/py4j/java_implementation.py
index 6801c71..43425b0 100644
--- a/pypaimon/py4j/java_implementation.py
+++ b/pypaimon/py4j/java_implementation.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+import os
 
 # pypaimon.api implementation based on Java code & py4j lib
 
@@ -30,6 +31,11 @@ from pypaimon.api import \
      table_commit, Schema, predicate)
 from typing import List, Iterator, Optional, Any, TYPE_CHECKING
 
+from pypaimon.pynative.common.exception import PyNativeNotImplementedError
+from pypaimon.pynative.common.predicate import PyNativePredicate
+from pypaimon.pynative.common.row.internal_row import InternalRow
+from pypaimon.pynative.util.reader_converter import ReaderConverter
+
 if TYPE_CHECKING:
     import ray
     from duckdb.duckdb import DuckDBPyConnection
@@ -72,7 +78,12 @@ class Table(table.Table):
 
     def new_read_builder(self) -> 'ReadBuilder':
         j_read_builder = 
get_gateway().jvm.InvocationUtil.getReadBuilder(self._j_table)
-        return ReadBuilder(j_read_builder, self._j_table.rowType(), 
self._catalog_options)
+        if self._j_table.primaryKeys().isEmpty():
+            primary_keys = None
+        else:
+            primary_keys = [str(key) for key in self._j_table.primaryKeys()]
+        return ReadBuilder(j_read_builder, self._j_table.rowType(), 
self._catalog_options,
+                           primary_keys)
 
     def new_batch_write_builder(self) -> 'BatchWriteBuilder':
         java_utils.check_batch_write(self._j_table)
@@ -82,16 +93,21 @@ class Table(table.Table):
 
 class ReadBuilder(read_builder.ReadBuilder):
 
-    def __init__(self, j_read_builder, j_row_type, catalog_options: dict):
+    def __init__(self, j_read_builder, j_row_type, catalog_options: dict, 
primary_keys: List[str]):
         self._j_read_builder = j_read_builder
         self._j_row_type = j_row_type
         self._catalog_options = catalog_options
+        self._primary_keys = primary_keys
+        self._predicate = None
+        self._projection = None
 
     def with_filter(self, predicate: 'Predicate'):
+        self._predicate = predicate
         self._j_read_builder.withFilter(predicate.to_j_predicate())
         return self
 
     def with_projection(self, projection: List[str]) -> 'ReadBuilder':
+        self._projection = projection
         field_names = list(map(lambda field: field.name(), 
self._j_row_type.getFields()))
         int_projection = list(map(lambda p: field_names.index(p), projection))
         gateway = get_gateway()
@@ -111,7 +127,8 @@ class ReadBuilder(read_builder.ReadBuilder):
 
     def new_read(self) -> 'TableRead':
         j_table_read = self._j_read_builder.newRead().executeFilter()
-        return TableRead(j_table_read, self._j_read_builder.readType(), 
self._catalog_options)
+        return TableRead(j_table_read, self._j_read_builder.readType(), 
self._catalog_options,
+                         self._predicate, self._projection, self._primary_keys)
 
     def new_predicate_builder(self) -> 'PredicateBuilder':
         return PredicateBuilder(self._j_row_type)
@@ -185,14 +202,29 @@ class Split(split.Split):
 
 class TableRead(table_read.TableRead):
 
-    def __init__(self, j_table_read, j_read_type, catalog_options):
+    def __init__(self, j_table_read, j_read_type, catalog_options, predicate, 
projection,
+                 primary_keys: List[str]):
+        self._j_table_read = j_table_read
+        self._j_read_type = j_read_type
+        self._catalog_options = catalog_options
+
+        self._predicate = predicate
+        self._projection = projection
+        self._primary_keys = primary_keys
+
         self._arrow_schema = java_utils.to_arrow_schema(j_read_type)
         self._j_bytes_reader = 
get_gateway().jvm.InvocationUtil.createParallelBytesReader(
             j_table_read, j_read_type, 
TableRead._get_max_workers(catalog_options))
 
-    def to_arrow(self, splits):
-        record_batch_reader = self.to_arrow_batch_reader(splits)
-        return pa.Table.from_batches(record_batch_reader, 
schema=self._arrow_schema)
+    def to_arrow(self, splits: List['Split']) -> pa.Table:
+        record_generator = self.to_record_generator(splits)
+
+        # If necessary, set the env constants.IMPLEMENT_MODE to 'py4j' to 
forcibly use py4j reader
+        if os.environ.get(constants.IMPLEMENT_MODE, '') != 'py4j' and 
record_generator is not None:
+            return TableRead._iterator_to_pyarrow_table(record_generator, 
self._arrow_schema)
+        else:
+            record_batch_reader = self.to_arrow_batch_reader(splits)
+            return pa.Table.from_batches(record_batch_reader, 
schema=self._arrow_schema)
 
     def to_arrow_batch_reader(self, splits):
         j_splits = list(map(lambda s: s.to_j_split(), splits))
@@ -219,6 +251,60 @@ class TableRead(table_read.TableRead):
 
         return ray.data.from_arrow(self.to_arrow(splits))
 
+    def to_record_generator(self, splits: List['Split']) -> 
Optional[Iterator[Any]]:
+        """
+        Returns a generator for iterating over records in the table.
+        If pynative reader is not available, returns None.
+        """
+        try:
+            j_splits = list(s.to_j_split() for s in splits)
+            j_reader = 
get_gateway().jvm.InvocationUtil.createReader(self._j_table_read, j_splits)
+            converter = ReaderConverter(self._predicate, self._projection, 
self._primary_keys)
+            pynative_reader = converter.convert_java_reader(j_reader)
+
+            def _record_generator():
+                try:
+                    batch = pynative_reader.read_batch()
+                    while batch is not None:
+                        record = batch.next()
+                        while record is not None:
+                            yield record
+                            record = batch.next()
+                        batch.release_batch()
+                        batch = pynative_reader.read_batch()
+                finally:
+                    pynative_reader.close()
+
+            return _record_generator()
+
+        except PyNativeNotImplementedError as e:
+            print(f"Generating pynative reader failed, will use py4j reader 
instead, "
+                  f"error message: {str(e)}")
+            return None
+
+    @staticmethod
+    def _iterator_to_pyarrow_table(record_generator, arrow_schema):
+        """
+        Converts a record generator into a pyarrow Table using the provided 
Arrow schema.
+        """
+        record_batches = []
+        current_batch = []
+        batch_size = 1024  # Can be adjusted according to needs for batch size
+
+        for record in record_generator:
+            record_dict = {field: record.get_field(i) for i, field in 
enumerate(arrow_schema.names)}
+            current_batch.append(record_dict)
+            if len(current_batch) >= batch_size:
+                batch = pa.RecordBatch.from_pylist(current_batch, 
schema=arrow_schema)
+                record_batches.append(batch)
+                current_batch = []
+
+        if current_batch:
+            batch = pa.RecordBatch.from_pylist(current_batch, 
schema=arrow_schema)
+            record_batches.append(batch)
+
+        return pa.Table.from_batches(record_batches, schema=arrow_schema)
+
     @staticmethod
     def _get_max_workers(catalog_options):
         # default is sequential
@@ -317,12 +403,16 @@ class BatchTableCommit(table_commit.BatchTableCommit):
 
 class Predicate(predicate.Predicate):
 
-    def __init__(self, j_predicate_bytes):
+    def __init__(self, py_predicate: PyNativePredicate, j_predicate_bytes):
+        self.py_predicate = py_predicate
         self._j_predicate_bytes = j_predicate_bytes
 
     def to_j_predicate(self):
         return deserialize_java_object(self._j_predicate_bytes)
 
+    def test(self, record: InternalRow) -> bool:
+        return self.py_predicate.test(record)
+
 
 class PredicateBuilder(predicate.PredicateBuilder):
 
@@ -350,7 +440,8 @@ class PredicateBuilder(predicate.PredicateBuilder):
             index,
             literals
         )
-        return Predicate(serialize_java_object(j_predicate))
+        return Predicate(PyNativePredicate(method, index, field, literals),
+                         serialize_java_object(j_predicate))
 
     def equal(self, field: str, literal: Any) -> Predicate:
         return self._build('equal', field, [literal])
@@ -396,11 +487,13 @@ class PredicateBuilder(predicate.PredicateBuilder):
         return self._build('between', field, [included_lower_bound, 
included_upper_bound])
 
     def and_predicates(self, predicates: List[Predicate]) -> Predicate:
-        predicates = list(map(lambda p: p.to_j_predicate(), predicates))
-        j_predicate = get_gateway().jvm.PredicationUtil.buildAnd(predicates)
-        return Predicate(serialize_java_object(j_predicate))
+        j_predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+        j_predicate = get_gateway().jvm.PredicationUtil.buildAnd(j_predicates)
+        return Predicate(PyNativePredicate('and', None, None, predicates),
+                         serialize_java_object(j_predicate))
 
     def or_predicates(self, predicates: List[Predicate]) -> Predicate:
-        predicates = list(map(lambda p: p.to_j_predicate(), predicates))
-        j_predicate = get_gateway().jvm.PredicationUtil.buildOr(predicates)
-        return Predicate(serialize_java_object(j_predicate))
+        j_predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+        j_predicate = get_gateway().jvm.PredicationUtil.buildOr(j_predicates)
+        return Predicate(PyNativePredicate('or', None, None, predicates),
+                         serialize_java_object(j_predicate))
diff --git a/pypaimon/py4j/util/constants.py b/pypaimon/py4j/util/constants.py
index f223309..200ac32 100644
--- a/pypaimon/py4j/util/constants.py
+++ b/pypaimon/py4j/util/constants.py
@@ -29,3 +29,4 @@ MAX_WORKERS = "max-workers"
 
 # ------------------ for tests (Please don't use it) ------------------
 PYPAIMON4J_TEST_MODE = '_PYPAIMON4J_TEST_MODE'
+IMPLEMENT_MODE = '_IMPLEMENT_MODE'
diff --git a/dev/dev-requirements.txt b/pypaimon/pynative/__init__.py
old mode 100755
new mode 100644
similarity index 84%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/__init__.py
index 4ed964e..65b48d4
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/__init__.py
@@ -15,16 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
diff --git a/dev/dev-requirements.txt b/pypaimon/pynative/common/__init__.py
old mode 100755
new mode 100644
similarity index 84%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/common/__init__.py
index 4ed964e..65b48d4
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/common/__init__.py
@@ -15,16 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
diff --git a/dev/dev-requirements.txt b/pypaimon/pynative/common/exception.py
old mode 100755
new mode 100644
similarity index 84%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/common/exception.py
index 4ed964e..9f37729
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/common/exception.py
@@ -16,15 +16,6 @@
 # limitations under the License.
 
################################################################################
 
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
+class PyNativeNotImplementedError(NotImplementedError):
+    """ Method or function hasn't been implemented by py-native paimon yet. """
+    pass
diff --git a/pypaimon/pynative/common/predicate.py 
b/pypaimon/pynative/common/predicate.py
new file mode 100644
index 0000000..cadff46
--- /dev/null
+++ b/pypaimon/pynative/common/predicate.py
@@ -0,0 +1,83 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from dataclasses import dataclass
+from typing import Any, List, Optional
+
+from pypaimon.pynative.common.row.internal_row import InternalRow
+
+
+@dataclass
+class PyNativePredicate:
+    method: str
+    index: int
+    field: str
+    literals: Optional[List[Any]] = None
+
+    def test(self, record: InternalRow) -> bool:
+        """
+        # Test whether the record satisfies the predicate condition.
+        """
+        if not hasattr(record, 'get_field'):
+            raise ValueError("Record must have get_field method")
+
+        if self.method == 'equal':
+            return record.get_field(self.index) == self.literals[0]
+        elif self.method == 'notEqual':
+            return record.get_field(self.index) != self.literals[0]
+        elif self.method == 'lessThan':
+            return record.get_field(self.index) < self.literals[0]
+        elif self.method == 'lessOrEqual':
+            return record.get_field(self.index) <= self.literals[0]
+        elif self.method == 'greaterThan':
+            return record.get_field(self.index) > self.literals[0]
+        elif self.method == 'greaterOrEqual':
+            return record.get_field(self.index) >= self.literals[0]
+        elif self.method == 'isNull':
+            return record.get_field(self.index) is None
+        elif self.method == 'isNotNull':
+            return record.get_field(self.index) is not None
+        elif self.method == 'startsWith':
+            field_value = record.get_field(self.index)
+            if not isinstance(field_value, str):
+                return False
+            return field_value.startswith(self.literals[0])
+        elif self.method == 'endsWith':
+            field_value = record.get_field(self.index)
+            if not isinstance(field_value, str):
+                return False
+            return field_value.endswith(self.literals[0])
+        elif self.method == 'contains':
+            field_value = record.get_field(self.index)
+            if not isinstance(field_value, str):
+                return False
+            return self.literals[0] in field_value
+        elif self.method == 'in':
+            return record.get_field(self.index) in self.literals
+        elif self.method == 'notIn':
+            return record.get_field(self.index) not in self.literals
+        elif self.method == 'between':
+            field_value = record.get_field(self.index)
+            return self.literals[0] <= field_value <= self.literals[1]
+        elif self.method == 'and':
+            return all(p.test(record) for p in self.literals)
+        elif self.method == 'or':
+            t = any(p.test(record) for p in self.literals)
+            return t
+        else:
+            raise ValueError(f"Unsupported predicate method: {self.method}")
diff --git a/dev/dev-requirements.txt b/pypaimon/pynative/common/row/__init__.py
old mode 100755
new mode 100644
similarity index 84%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/common/row/__init__.py
index 4ed964e..65b48d4
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/common/row/__init__.py
@@ -15,16 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
diff --git a/pypaimon/pynative/common/row/columnar_row.py 
b/pypaimon/pynative/common/row/columnar_row.py
new file mode 100644
index 0000000..244539d
--- /dev/null
+++ b/pypaimon/pynative/common/row/columnar_row.py
@@ -0,0 +1,63 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Any
+
+import pyarrow as pa
+
+from pypaimon.pynative.common.row.internal_row import InternalRow
+from pypaimon.pynative.common.row.key_value import RowKind
+
+
+class ColumnarRow(InternalRow):
+    """
+    Columnar row to support access to vector column data. It is a row based on 
PyArrow RecordBatch
+    """
+
+    def __init__(self, record_batch: pa.RecordBatch, row_id: int = 0,
+                 row_kind: RowKind = RowKind.INSERT):
+        self._batch = record_batch
+        self._row_id = row_id
+        self._row_kind = row_kind
+
+    def get_row_id(self) -> int:
+        return self._row_id
+
+    def set_row_id(self, row_id: int) -> None:
+        self._row_id = row_id
+
+    def batch(self) -> pa.RecordBatch:
+        return self._batch
+
+    def get_field(self, pos: int) -> Any:
+        return self._batch.column(pos)[self._row_id].as_py()
+
+    def is_null_at(self, pos: int) -> bool:
+        return self._batch.column(pos).is_null(self._row_id)
+
+    def set_field(self, pos: int, value: Any) -> None:
+        raise NotImplementedError()
+
+    def get_row_kind(self) -> RowKind:
+        return self._row_kind
+
+    def set_row_kind(self, kind: RowKind) -> None:
+        self._row_kind = kind
+
+    def __len__(self) -> int:
+        return self._batch.num_columns
diff --git a/pypaimon/pynative/common/row/internal_row.py 
b/pypaimon/pynative/common/row/internal_row.py
new file mode 100644
index 0000000..4c46ed9
--- /dev/null
+++ b/pypaimon/pynative/common/row/internal_row.py
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from abc import ABC, abstractmethod
+from typing import Any
+
+from pypaimon.pynative.common.row.row_kind import RowKind
+
+
+class InternalRow(ABC):
+    """
+    Base interface for an internal data structure representing data of RowType.
+    """
+
+    @abstractmethod
+    def get_field(self, pos: int) -> Any:
+        """
+        Returns the value at the given position.
+        """
+
+    @abstractmethod
+    def is_null_at(self, pos: int) -> bool:
+        """
+        Returns true if the element is null at the given position.
+        """
+
+    @abstractmethod
+    def set_field(self, pos: int, value: Any) -> None:
+        """
+        set element to a row at the given position.
+        """
+
+    @abstractmethod
+    def get_row_kind(self) -> RowKind:
+        """
+        Returns the kind of change that this row describes in a changelog.
+        """
+
+    @abstractmethod
+    def set_row_kind(self, kind: RowKind) -> None:
+        """
+        Sets the kind of change that this row describes in a changelog.
+        """
+
+    @abstractmethod
+    def __len__(self) -> int:
+        """
+        Returns the number of fields in this row.
+        The number does not include RowKind. It is kept separately.
+        """
+
+    def __str__(self) -> str:
+        fields = []
+        for pos in range(self.__len__()):
+            value = self.get_field(pos)
+            fields.append(str(value))
+        return " ".join(fields)
diff --git a/dev/dev-requirements.txt 
b/pypaimon/pynative/common/row/key_value.py
old mode 100755
new mode 100644
similarity index 63%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/common/row/key_value.py
index 4ed964e..d8c9951
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/common/row/key_value.py
@@ -16,15 +16,27 @@
 # limitations under the License.
 
################################################################################
 
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
+from dataclasses import dataclass
+
+from pypaimon.pynative.common.row.internal_row import InternalRow
+from pypaimon.pynative.common.row.row_kind import RowKind
+
+"""
+A key value, including user key, sequence number, value kind and value.
+"""
+
+
+@dataclass
+class KeyValue:
+    key: InternalRow
+    sequence_number: int
+    value_kind: RowKind
+    value: InternalRow
+    level: int = -1
+
+    def set_level(self, level: int) -> 'KeyValue':
+        self.level = level
+        return self
+
+    def is_add(self) -> bool:
+        return self.value_kind.is_add()
diff --git a/pypaimon/pynative/common/row/offset_row.py 
b/pypaimon/pynative/common/row/offset_row.py
new file mode 100644
index 0000000..8ae21a2
--- /dev/null
+++ b/pypaimon/pynative/common/row/offset_row.py
@@ -0,0 +1,59 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Any
+
+from pypaimon.pynative.common.row.internal_row import InternalRow
+from pypaimon.pynative.common.row.row_kind import RowKind
+
+
+class OffsetRow(InternalRow):
+    """
+    A InternalRow to wrap row with offset.
+    """
+
+    def __init__(self, row: InternalRow, offset: int, arity: int):
+        self.row = row
+        self.offset = offset
+        self.arity = arity
+
+    def replace(self, row: InternalRow) -> 'OffsetRow':
+        self.row = row
+        return self
+
+    def get_field(self, pos: int):
+        if pos >= self.arity:
+            raise IndexError(f"Position {pos} is out of bounds for arity 
{self.arity}")
+        return self.row.get_field(pos + self.offset)
+
+    def is_null_at(self, pos: int) -> bool:
+        if pos >= self.arity:
+            raise IndexError(f"Position {pos} is out of bounds for arity 
{self.arity}")
+        return self.row.is_null_at(pos + self.offset)
+
+    def set_field(self, pos: int, value: Any) -> None:
+        raise NotImplementedError()
+
+    def get_row_kind(self) -> RowKind:
+        return self.row.get_row_kind()
+
+    def set_row_kind(self, kind: RowKind) -> None:
+        self.row.set_row_kind(kind)
+
+    def __len__(self) -> int:
+        return self.arity
diff --git a/pypaimon/pynative/common/row/row_kind.py 
b/pypaimon/pynative/common/row/row_kind.py
new file mode 100644
index 0000000..ff9b9b1
--- /dev/null
+++ b/pypaimon/pynative/common/row/row_kind.py
@@ -0,0 +1,57 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from enum import Enum
+
+
+class RowKind(Enum):
+    """
+    Insertion operation.
+    """
+    INSERT = 0  # +I: Update operation with the previous content of the 
updated row.
+    UPDATE_BEFORE = 1  # -U: Update operation with the previous content of the 
updated row
+    UPDATE_AFTER = 2  # +U: Update operation with new content of the updated 
row
+    DELETE = 3  # -D: Deletion operation
+
+    def is_add(self) -> bool:
+        return self in (RowKind.INSERT, RowKind.UPDATE_AFTER)
+
+    def to_string(self) -> str:
+        if self == RowKind.INSERT:
+            return "+I"
+        elif self == RowKind.UPDATE_BEFORE:
+            return "-U"
+        elif self == RowKind.UPDATE_AFTER:
+            return "+U"
+        elif self == RowKind.DELETE:
+            return "-D"
+        else:
+            return "??"
+
+    @staticmethod
+    def from_string(kind_str: str) -> 'RowKind':
+        if kind_str == "+I":
+            return RowKind.INSERT
+        elif kind_str == "-U":
+            return RowKind.UPDATE_BEFORE
+        elif kind_str == "+U":
+            return RowKind.UPDATE_AFTER
+        elif kind_str == "-D":
+            return RowKind.DELETE
+        else:
+            raise ValueError(f"Unknown row kind string: {kind_str}")
diff --git a/dev/dev-requirements.txt b/pypaimon/pynative/reader/__init__.py
old mode 100755
new mode 100644
similarity index 84%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/reader/__init__.py
index 4ed964e..65b48d4
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/reader/__init__.py
@@ -15,16 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
diff --git a/pypaimon/pynative/reader/avro_format_reader.py 
b/pypaimon/pynative/reader/avro_format_reader.py
new file mode 100644
index 0000000..6852516
--- /dev/null
+++ b/pypaimon/pynative/reader/avro_format_reader.py
@@ -0,0 +1,83 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Any, Dict, List, Optional
+
+import fastavro
+import pyarrow as pa
+
+from pypaimon.pynative.common.row.internal_row import InternalRow
+from pypaimon.pynative.reader.core.columnar_row_iterator import 
ColumnarRowIterator
+from pypaimon.pynative.reader.core.file_record_iterator import 
FileRecordIterator
+from pypaimon.pynative.reader.core.file_record_reader import FileRecordReader
+
+
+class AvroFormatReader(FileRecordReader[InternalRow]):
+    """
+    A RecordReader implementation for reading Avro files using fastavro.
+    The reader converts Avro records to pyarrow.RecordBatch format, which is 
compatible with
+    the ColumnarRowIterator.
+    """
+
+    def __init__(self, file_path: str, batch_size: int, projected_type: 
Optional[List[str]] = None):
+        self._file_path = file_path
+        self._batch_size = batch_size
+        self._projected_type = projected_type
+
+        self._reader = fastavro.reader(open(file_path, 'rb'))
+        self._schema = self._reader.schema
+        self._current_batch: List[Dict[str, Any]] = []
+
+    def read_batch(self) -> Optional[FileRecordIterator[InternalRow]]:
+        try:
+            self._current_batch = []
+            for _ in range(self._batch_size):
+                try:
+                    record = next(self._reader)
+                    self._current_batch.append(record)
+                except StopIteration:
+                    break
+
+            if not self._current_batch:
+                return None
+
+            # TODO: Temporarily converting results to pyarrow RecordBatch, 
reusing its logic.
+            # TODO: Custom adjustments will follow later.
+            record_batch = self._convert_to_record_batch(self._current_batch)
+            if record_batch is None:
+                return None
+
+            return ColumnarRowIterator(
+                self._file_path,
+                record_batch
+            )
+        except Exception as e:
+            print(f"Error reading Avro batch: {e}")
+            raise
+
+    def _convert_to_record_batch(self, records: List[Dict[str, Any]]) -> 
pa.RecordBatch:
+        if not records:
+            return None
+
+        if self._projected_type is not None:
+            records = [{k: r[k] for k in self._projected_type} for r in 
records]
+
+        return pa.RecordBatch.from_pylist(records)
+
+    def close(self):
+        pass
diff --git a/pypaimon/pynative/reader/concat_record_reader.py 
b/pypaimon/pynative/reader/concat_record_reader.py
new file mode 100644
index 0000000..ccbffab
--- /dev/null
+++ b/pypaimon/pynative/reader/concat_record_reader.py
@@ -0,0 +1,57 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional
+
+from py4j.java_gateway import JavaObject
+
+from pypaimon.pynative.reader.core.record_iterator import RecordIterator
+from pypaimon.pynative.reader.core.record_reader import RecordReader
+
+
+class ConcatRecordReader(RecordReader):
+    """
+    This reader is to concatenate a list of RecordReaders and read them 
sequentially.
+    The input list is already sorted by key and sequence number, and the key 
intervals do not
+    overlap each other.
+    """
+
+    def __init__(self, converter, j_supplier_queue: JavaObject):
+        self.converter = converter
+        self.j_supplier_queue = j_supplier_queue
+        self.current: Optional[RecordReader] = None
+
+    def read_batch(self) -> Optional[RecordIterator]:
+        while True:
+            if self.current is not None:
+                iterator = self.current.read_batch()
+                if iterator is not None:
+                    return iterator
+                self.current.close()
+                self.current = None
+            elif not self.j_supplier_queue.isEmpty():
+                # If the Java supplier queue is not empty, initialize the 
reader by using py4j
+                j_supplier = self.j_supplier_queue.poll()
+                j_reader = j_supplier.get()
+                self.current = self.converter.convert_java_reader(j_reader)
+            else:
+                return None
+
+    def close(self) -> None:
+        if self.current is not None:
+            self.current.close()
diff --git a/dev/dev-requirements.txt 
b/pypaimon/pynative/reader/core/__init__.py
old mode 100755
new mode 100644
similarity index 84%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/reader/core/__init__.py
index 4ed964e..65b48d4
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/reader/core/__init__.py
@@ -15,16 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
diff --git a/pypaimon/pynative/reader/core/columnar_row_iterator.py 
b/pypaimon/pynative/reader/core/columnar_row_iterator.py
new file mode 100644
index 0000000..b42b96c
--- /dev/null
+++ b/pypaimon/pynative/reader/core/columnar_row_iterator.py
@@ -0,0 +1,61 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional
+
+import pyarrow as pa
+
+from pypaimon.pynative.common.row.columnar_row import ColumnarRow
+from pypaimon.pynative.common.row.key_value import InternalRow
+from pypaimon.pynative.reader.core.file_record_iterator import 
FileRecordIterator
+
+
+class ColumnarRowIterator(FileRecordIterator[InternalRow]):
+    """
+    A RecordIterator that returns InternalRows. The next row is set by 
ColumnarRow.setRowId.
+    """
+
+    def __init__(self, file_path: str, record_batch: pa.RecordBatch):
+        self.file_path = file_path
+        self._record_batch = record_batch
+        self._row = ColumnarRow(record_batch)
+
+        self.num_rows = record_batch.num_rows
+        self.next_pos = 0
+        self.next_file_pos = 0
+
+    def next(self) -> Optional[InternalRow]:
+        if self.next_pos < self.num_rows:
+            self._row.set_row_id(self.next_pos)
+            self.next_pos += 1
+            self.next_file_pos += 1
+            return self._row
+        return None
+
+    def returned_position(self) -> int:
+        return self.next_file_pos - 1
+
+    def file_path(self) -> str:
+        return self.file_path
+
+    def reset(self, next_file_pos: int):
+        self.next_pos = 0
+        self.next_file_pos = next_file_pos
+
+    def release_batch(self):
+        del self._record_batch
diff --git a/pypaimon/py4j/util/constants.py 
b/pypaimon/pynative/reader/core/file_record_iterator.py
similarity index 60%
copy from pypaimon/py4j/util/constants.py
copy to pypaimon/pynative/reader/core/file_record_iterator.py
index f223309..590a65e 100644
--- a/pypaimon/py4j/util/constants.py
+++ b/pypaimon/pynative/reader/core/file_record_iterator.py
@@ -16,16 +16,28 @@
 # limitations under the License.
 
################################################################################
 
-# ---------------------------- for env var ----------------------------
-PYPAIMON_CONN_INFO_PATH = '_PYPAIMON_CONN_INFO_PATH'
-PYPAIMON_JVM_ARGS = '_PYPAIMON_JVM_ARGS'
-PYPAIMON_JAVA_CLASSPATH = '_PYPAIMON_JAVA_CLASSPATH'
-PYPAIMON_HADOOP_CLASSPATH = '_PYPAIMON_HADOOP_CLASSPATH'
-PYPAIMON_MAIN_CLASS = 'org.apache.paimon.python.PythonGatewayServer'
-PYPAIMON_MAIN_ARGS = '_PYPAIMON_MAIN_ARGS'
+from abc import ABC, abstractmethod
+from typing import TypeVar
 
-# ------------------------ for catalog options ------------------------
-MAX_WORKERS = "max-workers"
+from pypaimon.pynative.reader.core.record_iterator import RecordIterator
 
-# ------------------ for tests (Please don't use it) ------------------
-PYPAIMON4J_TEST_MODE = '_PYPAIMON4J_TEST_MODE'
+T = TypeVar('T')
+
+
+class FileRecordIterator(RecordIterator[T], ABC):
+    """
+    A RecordIterator to support returning the record's row position and file 
Path.
+    """
+
+    @abstractmethod
+    def returned_position(self) -> int:
+        """
+        Get the row position of the row returned by next().
+        Returns: the row position from 0 to the number of rows in the file
+        """
+
+    @abstractmethod
+    def file_path(self) -> str:
+        """
+        Returns: the file path
+        """
diff --git a/dev/dev-requirements.txt 
b/pypaimon/pynative/reader/core/file_record_reader.py
old mode 100755
new mode 100644
similarity index 66%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/reader/core/file_record_reader.py
index 4ed964e..2d03cd1
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/reader/core/file_record_reader.py
@@ -16,15 +16,22 @@
 # limitations under the License.
 
################################################################################
 
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
+from abc import abstractmethod
+from typing import Optional, TypeVar
+
+from pypaimon.pynative.reader.core.file_record_iterator import 
FileRecordIterator
+from pypaimon.pynative.reader.core.record_reader import RecordReader
+
+T = TypeVar('T')
+
+
+class FileRecordReader(RecordReader[T]):
+    """
+    A RecordReader to support returning FileRecordIterator.
+    """
+
+    @abstractmethod
+    def read_batch(self) -> Optional[FileRecordIterator]:
+        """
+        Reads one batch
+        """
diff --git a/dev/dev-requirements.txt 
b/pypaimon/pynative/reader/core/record_iterator.py
old mode 100755
new mode 100644
similarity index 63%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/reader/core/record_iterator.py
index 4ed964e..4d3712c
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/reader/core/record_iterator.py
@@ -16,15 +16,25 @@
 # limitations under the License.
 
################################################################################
 
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
+from abc import ABC, abstractmethod
+from typing import Generic, Optional, TypeVar
+
+T = TypeVar('T')
+
+
+class RecordIterator(Generic[T], ABC):
+    """
+    An internal iterator interface which presents a more restrictive API than 
Iterator
+    """
+
+    @abstractmethod
+    def next(self) -> Optional[T]:
+        """
+        Gets the next record from the iterator. Returns null if this iterator 
has no more elements.
+        """
+
+    @abstractmethod
+    def release_batch(self):
+        """
+        Releases the batch that this iterator iterated over.
+        """
diff --git a/dev/dev-requirements.txt 
b/pypaimon/pynative/reader/core/record_reader.py
old mode 100755
new mode 100644
similarity index 61%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/reader/core/record_reader.py
index 4ed964e..f7226fa
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/reader/core/record_reader.py
@@ -16,15 +16,27 @@
 # limitations under the License.
 
################################################################################
 
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
+from abc import ABC, abstractmethod
+from typing import Generic, Optional, TypeVar
+
+from pypaimon.pynative.reader.core.record_iterator import RecordIterator
+
+T = TypeVar('T')
+
+
+class RecordReader(Generic[T], ABC):
+    """
+    The reader that reads the batches of records.
+    """
+
+    @abstractmethod
+    def read_batch(self) -> Optional[RecordIterator[T]]:
+        """
+        Reads one batch. The method should return null when reaching the end 
of the input.
+        """
+
+    @abstractmethod
+    def close(self):
+        """
+        Closes the reader and should release all resources.
+        """
diff --git a/pypaimon/py4j/util/constants.py 
b/pypaimon/pynative/reader/data_file_record_reader.py
similarity index 52%
copy from pypaimon/py4j/util/constants.py
copy to pypaimon/pynative/reader/data_file_record_reader.py
index f223309..0b161fe 100644
--- a/pypaimon/py4j/util/constants.py
+++ b/pypaimon/pynative/reader/data_file_record_reader.py
@@ -16,16 +16,30 @@
 # limitations under the License.
 
################################################################################
 
-# ---------------------------- for env var ----------------------------
-PYPAIMON_CONN_INFO_PATH = '_PYPAIMON_CONN_INFO_PATH'
-PYPAIMON_JVM_ARGS = '_PYPAIMON_JVM_ARGS'
-PYPAIMON_JAVA_CLASSPATH = '_PYPAIMON_JAVA_CLASSPATH'
-PYPAIMON_HADOOP_CLASSPATH = '_PYPAIMON_HADOOP_CLASSPATH'
-PYPAIMON_MAIN_CLASS = 'org.apache.paimon.python.PythonGatewayServer'
-PYPAIMON_MAIN_ARGS = '_PYPAIMON_MAIN_ARGS'
-
-# ------------------------ for catalog options ------------------------
-MAX_WORKERS = "max-workers"
-
-# ------------------ for tests (Please don't use it) ------------------
-PYPAIMON4J_TEST_MODE = '_PYPAIMON4J_TEST_MODE'
+from typing import Optional
+
+from pypaimon.pynative.common.row.internal_row import InternalRow
+from pypaimon.pynative.reader.core.file_record_iterator import 
FileRecordIterator
+from pypaimon.pynative.reader.core.file_record_reader import FileRecordReader
+from pypaimon.pynative.reader.core.record_reader import RecordReader
+
+
+class DataFileRecordReader(FileRecordReader[InternalRow]):
+    """
+    Reads InternalRow from data files.
+    """
+
+    def __init__(self, wrapped_reader: RecordReader):
+        self.wrapped_reader = wrapped_reader
+
+    def read_batch(self) -> Optional[FileRecordIterator['InternalRow']]:
+        iterator = self.wrapped_reader.read_batch()
+        if iterator is None:
+            return None
+
+        # TODO: Handle partition_info, index_mapping, and cast_mapping
+
+        return iterator
+
+    def close(self) -> None:
+        self.wrapped_reader.close()
diff --git a/pypaimon/pynative/reader/drop_delete_reader.py 
b/pypaimon/pynative/reader/drop_delete_reader.py
new file mode 100644
index 0000000..ccb70e0
--- /dev/null
+++ b/pypaimon/pynative/reader/drop_delete_reader.py
@@ -0,0 +1,62 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional
+
+from pypaimon.pynative.common.row.key_value import KeyValue
+from pypaimon.pynative.reader.core.record_iterator import RecordIterator
+from pypaimon.pynative.reader.core.record_reader import RecordReader
+
+
+class DropDeleteReader(RecordReader):
+    """
+    A RecordReader which drops KeyValue that does not meet RowKind. isAdd from 
the wrapped reader.
+    """
+
+    def __init__(self, wrapped_reader: RecordReader[KeyValue]):
+        self.wrapped_reader = wrapped_reader
+
+    def read_batch(self) -> Optional[RecordIterator]:
+        batch = self.wrapped_reader.read_batch()
+        if batch is None:
+            return None
+
+        return DropDeleteIterator(batch)
+
+    def close(self) -> None:
+        self.wrapped_reader.close()
+
+
+class DropDeleteIterator(RecordIterator[KeyValue]):
+    """
+    An iterator that drops KeyValue that does not meet RowKind.
+    """
+
+    def __init__(self, batch: RecordIterator[KeyValue]):
+        self.batch = batch
+
+    def next(self) -> Optional[KeyValue]:
+        while True:
+            kv = self.batch.next()
+            if kv is None:
+                return None
+            if kv.is_add():
+                return kv
+
+    def release_batch(self) -> None:
+        self.batch.release_batch()
diff --git a/dev/dev-requirements.txt 
b/pypaimon/pynative/reader/empty_record_reader.py
old mode 100755
new mode 100644
similarity index 68%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/reader/empty_record_reader.py
index 4ed964e..9883cb8
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/reader/empty_record_reader.py
@@ -16,15 +16,22 @@
 # limitations under the License.
 
################################################################################
 
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
+from typing import Optional
+
+from pypaimon.pynative.reader.core.file_record_reader import FileRecordReader
+from pypaimon.pynative.reader.core.record_iterator import RecordIterator
+
+
+class EmptyFileRecordReader(FileRecordReader):
+    """
+    An empty FileRecordReader.
+    """
+
+    def __init__(self):
+        pass
+
+    def read_batch(self) -> Optional[RecordIterator]:
+        return None
+
+    def close(self) -> None:
+        pass
diff --git a/pypaimon/pynative/reader/filter_record_reader.py 
b/pypaimon/pynative/reader/filter_record_reader.py
new file mode 100644
index 0000000..ef57829
--- /dev/null
+++ b/pypaimon/pynative/reader/filter_record_reader.py
@@ -0,0 +1,64 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional, TypeVar
+
+from pypaimon import Predicate
+from pypaimon.pynative.reader.core.record_reader import RecordIterator, 
RecordReader
+
+T = TypeVar('T')
+
+
+class FilterRecordReader(RecordReader[T]):
+    """
+    A RecordReader that implements filtering functionality.
+    """
+
+    def __init__(self, reader: RecordReader[T], predicate: Predicate):
+        self.reader = reader
+        self.predicate = predicate
+
+    def read_batch(self) -> Optional[RecordIterator[T]]:
+        iterator = self.reader.read_batch()
+        if iterator is None:
+            return None
+        return FilterRecordIterator(iterator, self.predicate)
+
+    def close(self) -> None:
+        self.reader.close()
+
+
+class FilterRecordIterator(RecordIterator[T]):
+    """
+    A RecordIterator that implements filtering functionality.
+    """
+
+    def __init__(self, iterator: RecordIterator[T], predicate: Predicate):
+        self.iterator = iterator
+        self.predicate = predicate
+
+    def next(self) -> Optional[T]:
+        while True:
+            record = self.iterator.next()
+            if record is None:
+                return None
+            if self.predicate.test(record):
+                return record
+
+    def release_batch(self) -> None:
+        self.iterator.release_batch()
diff --git a/pypaimon/pynative/reader/key_value_unwrap_reader.py 
b/pypaimon/pynative/reader/key_value_unwrap_reader.py
new file mode 100644
index 0000000..9add03e
--- /dev/null
+++ b/pypaimon/pynative/reader/key_value_unwrap_reader.py
@@ -0,0 +1,74 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Any, Optional
+
+from pypaimon.pynative.common.row.internal_row import InternalRow
+from pypaimon.pynative.common.row.key_value import KeyValue
+from pypaimon.pynative.common.row.row_kind import RowKind
+from pypaimon.pynative.reader.core.record_iterator import RecordIterator
+from pypaimon.pynative.reader.core.record_reader import RecordReader
+
+
+class KeyValueUnwrapReader(RecordReader[InternalRow]):
+    """
+    A RecordReader that converts a KeyValue type record reader into an 
InternalRow type reader
+    Corresponds to the KeyValueTableRead$1 in Java version.
+    """
+
+    def __init__(self, wrapped_reader: RecordReader[KeyValue]):
+        self.wrapped_reader = wrapped_reader
+
+    def read_batch(self) -> Optional[RecordIterator[InternalRow]]:
+        batch = self.wrapped_reader.read_batch()
+        if batch is None:
+            return None
+
+        return KeyValueUnwrapIterator(batch)
+
+    def close(self) -> None:
+        self.wrapped_reader.close()
+
+
+class KeyValueUnwrapIterator(RecordIterator[InternalRow]):
+    """
+    An Iterator that converts a KeyValue into an InternalRow
+    """
+
+    def __init__(self, batch: RecordIterator[KeyValue]):
+        self.batch = batch
+        self.kv: KeyValue = None
+        self.pre_value_row_kind: RowKind = None
+
+    def next(self) -> Optional[Any]:
+        # The row_data is reused in iterator, we should set back to real kind
+        if self.kv is not None:
+            self.kv.value.set_row_kind(self.pre_value_row_kind)
+
+        self.kv = self.batch.next()
+        if self.kv is None:
+            return None
+
+        row_data = self.kv.value
+        self.pre_value_row_kind = row_data.get_row_kind()
+
+        row_data.set_row_kind(self.kv.value_kind)
+        return row_data
+
+    def release_batch(self) -> None:
+        self.batch.release_batch()
diff --git a/pypaimon/pynative/reader/key_value_wrap_reader.py 
b/pypaimon/pynative/reader/key_value_wrap_reader.py
new file mode 100644
index 0000000..980e7e5
--- /dev/null
+++ b/pypaimon/pynative/reader/key_value_wrap_reader.py
@@ -0,0 +1,97 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional
+
+from pypaimon.pynative.common.row.internal_row import InternalRow
+from pypaimon.pynative.common.row.key_value import KeyValue
+from pypaimon.pynative.common.row.offset_row import OffsetRow
+from pypaimon.pynative.common.row.row_kind import RowKind
+from pypaimon.pynative.reader.core.file_record_iterator import 
FileRecordIterator
+from pypaimon.pynative.reader.core.file_record_reader import FileRecordReader
+
+
+class KeyValueWrapReader(FileRecordReader[KeyValue]):
+    """
+    RecordReader for reading KeyValue data files.
+    Corresponds to the KeyValueDataFileRecordReader in Java version.
+    """
+
+    def __init__(self, wrapped_reader: FileRecordReader[InternalRow],
+                 level, key_arity, value_arity):
+        self.wrapped_reader = wrapped_reader
+        self.level = level
+        self.key_arity = key_arity
+        self.value_arity = value_arity
+
+    def read_batch(self) -> Optional[FileRecordIterator[KeyValue]]:
+        iterator = self.wrapped_reader.read_batch()
+        if iterator is None:
+            return None
+        return KeyValueWrapIterator(iterator, self.key_arity, 
self.value_arity, self.level)
+
+    def close(self):
+        self.wrapped_reader.close()
+
+
+class KeyValueWrapIterator(FileRecordIterator[KeyValue]):
+    """
+    An Iterator that converts an PrimaryKey InternalRow into a KeyValue
+    """
+
+    def __init__(
+            self,
+            iterator: FileRecordIterator,
+            key_arity: int,
+            value_arity: int,
+            level: int
+    ):
+        self.iterator = iterator
+        self.key_arity = key_arity
+        self.value_arity = value_arity
+        self.level = level
+
+        self.reused_key = OffsetRow(None, 0, key_arity)
+        self.reused_value = OffsetRow(None, key_arity + 2, value_arity)
+
+    def next(self) -> Optional[KeyValue]:
+        row = self.iterator.next()
+        if row is None:
+            return None
+
+        self.reused_key.replace(row)
+        self.reused_value.replace(row)
+
+        sequence_number = row.get_field(self.key_arity)
+        value_kind = RowKind(row.get_field(self.key_arity + 1))
+
+        return KeyValue(
+            key=self.reused_key,
+            sequence_number=sequence_number,
+            value_kind=value_kind,
+            value=self.reused_value
+        ).set_level(self.level)
+
+    def returned_position(self) -> int:
+        return self.iterator.returned_position()
+
+    def file_path(self) -> str:
+        return self.iterator.file_path()
+
+    def release_batch(self):
+        self.iterator.release_batch()
diff --git a/pypaimon/pynative/reader/pyarrow_dataset_reader.py 
b/pypaimon/pynative/reader/pyarrow_dataset_reader.py
new file mode 100644
index 0000000..2f3bc85
--- /dev/null
+++ b/pypaimon/pynative/reader/pyarrow_dataset_reader.py
@@ -0,0 +1,78 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional, List
+
+import pyarrow.dataset as ds
+
+from pypaimon import Predicate
+from pypaimon.pynative.common.row.internal_row import InternalRow
+from pypaimon.pynative.reader.core.columnar_row_iterator import 
ColumnarRowIterator
+from pypaimon.pynative.reader.core.file_record_iterator import 
FileRecordIterator
+from pypaimon.pynative.reader.core.file_record_reader import FileRecordReader
+from pypaimon.pynative.util.predicate_converter import convert_predicate
+
+
+class PyArrowDatasetReader(FileRecordReader[InternalRow]):
+    """
+    A PyArrowDatasetReader that reads data from a dataset file using PyArrow,
+    and filters it based on the provided predicate and projection.
+    """
+
+    def __init__(self, format, file_path, batch_size, projection,
+                 predicate: Predicate, primary_keys: List[str]):
+        if primary_keys is not None:
+            if projection is not None:
+                key_columns = []
+                for pk in primary_keys:
+                    key_column = f"_KEY_{pk}"
+                    if key_column not in projection:
+                        key_columns.append(key_column)
+                system_columns = ["_SEQUENCE_NUMBER", "_VALUE_KIND"]
+                projection = key_columns + system_columns + projection
+            # TODO: utilize predicate to improve performance
+            predicate = None
+
+        if predicate is not None:
+            predicate = convert_predicate(predicate)
+
+        self._file_path = file_path
+        self.dataset = ds.dataset(file_path, format=format)
+        self.scanner = self.dataset.scanner(
+            columns=projection,
+            filter=predicate,
+            batch_size=batch_size
+        )
+        self.batch_iterator = self.scanner.to_batches()
+
+    def read_batch(self) -> Optional[FileRecordIterator[InternalRow]]:
+        try:
+            record_batch = next(self.batch_iterator, None)
+            if record_batch is None:
+                return None
+
+            return ColumnarRowIterator(
+                self._file_path,
+                record_batch
+            )
+        except Exception as e:
+            print(f"Error reading batch: {e}")
+            raise
+
+    def close(self):
+        pass
diff --git a/pypaimon/pynative/reader/sort_merge_reader.py 
b/pypaimon/pynative/reader/sort_merge_reader.py
new file mode 100644
index 0000000..896eb50
--- /dev/null
+++ b/pypaimon/pynative/reader/sort_merge_reader.py
@@ -0,0 +1,264 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import heapq
+from typing import Any, Callable, List, Optional
+
+import pyarrow as pa
+
+from pypaimon.pynative.common.row.key_value import KeyValue
+from pypaimon.pynative.common.row.row_kind import RowKind
+from pypaimon.pynative.reader.core.record_iterator import RecordIterator
+from pypaimon.pynative.reader.core.record_reader import RecordReader
+
+
+def built_comparator(key_schema: pa.Schema) -> Callable[[Any, Any], int]:
+    def comparator(key1, key2) -> int:
+        if key1 is None and key2 is None:
+            return 0
+        if key1 is None:
+            return -1
+        if key2 is None:
+            return 1
+
+        for i, field in enumerate(key_schema):
+            field_type = field.type
+            val1 = key1.get_field(i)
+            val2 = key2.get_field(i)
+
+            if val1 is None and val2 is None:
+                continue
+            if val1 is None:
+                return -1
+            if val2 is None:
+                return 1
+
+            if (pa.types.is_integer(field_type) or 
pa.types.is_floating(field_type)
+                    or pa.types.is_boolean(field_type)):
+                if val1 < val2:
+                    return -1
+                elif val1 > val2:
+                    return 1
+            elif pa.types.is_string(field_type) or 
pa.types.is_binary(field_type):
+                if val1 < val2:
+                    return -1
+                elif val1 > val2:
+                    return 1
+            elif pa.types.is_timestamp(field_type) or 
pa.types.is_date(field_type):
+                if val1 < val2:
+                    return -1
+                elif val1 > val2:
+                    return 1
+            else:
+                str_val1 = str(val1)
+                str_val2 = str(val2)
+                if str_val1 < str_val2:
+                    return -1
+                elif str_val1 > str_val2:
+                    return 1
+        return 0
+    return comparator
+
+
+class DeduplicateMergeFunction:
+    def __init__(self, ignore_delete: bool = False):
+        self.ignore_delete = ignore_delete
+        self.latest_kv = None
+        self.is_initialized = False
+        self.initial_kv = None
+
+    def reset(self) -> None:
+        self.latest_kv = None
+        self.is_initialized = False
+        self.initial_kv = None
+
+    def add(self, kv: KeyValue) -> None:
+        if self.initial_kv is None:
+            self.initial_kv = kv
+            return
+
+        if not self.is_initialized:
+            if not self.ignore_delete or not self.initial_kv.value_kind == 
RowKind.DELETE:
+                self.latest_kv = self.initial_kv
+            self.is_initialized = True
+
+        if self.ignore_delete and kv.value_kind == RowKind.DELETE:
+            return
+
+        self.latest_kv = kv
+
+    def get_result(self) -> Optional[KeyValue]:
+        if not self.is_initialized:
+            return self.initial_kv
+        return self.latest_kv
+
+
+class Element:
+    def __init__(self, kv, iterator: RecordIterator, reader: RecordReader):
+        self.kv = kv
+        self.iterator = iterator
+        self.reader = reader
+
+    def update(self) -> bool:
+        next_kv = self.iterator.next()
+        if next_kv is None:
+            return False
+        self.kv = next_kv
+        return True
+
+
+class HeapEntry:
+    def __init__(self, key, element: Element, key_comparator):
+        self.key = key
+        self.element = element
+        self.key_comparator = key_comparator
+
+    def __lt__(self, other):
+        result = self.key_comparator(self.key, other.key)
+        if result < 0:
+            return True
+        elif result > 0:
+            return False
+
+        return self.element.kv.sequence_number < 
other.element.kv.sequence_number
+
+
+class SortMergeIterator(RecordIterator):
+    def __init__(self, reader, polled: List[Element], min_heap, merge_function,
+                 user_key_comparator, next_batch_readers):
+        self.reader = reader
+        self.polled = polled
+        self.min_heap = min_heap
+        self.merge_function = merge_function
+        self.user_key_comparator = user_key_comparator
+        self.next_batch_readers = next_batch_readers
+        self.released = False
+
+    def next(self):
+        while True:
+            has_more = self._next_impl()
+            if not has_more:
+                return None
+            result = self.merge_function.get_result()
+            if result is not None:
+                return result
+
+    def _next_impl(self):
+        if self.released:
+            raise RuntimeError("SortMergeIterator.next called after release")
+
+        if not self.next_batch_readers:
+            for element in self.polled:
+                if element.update():
+                    entry = HeapEntry(element.kv.key, element, 
self.user_key_comparator)
+                    heapq.heappush(self.min_heap, entry)
+                else:
+                    element.iterator.release_batch()
+                    self.next_batch_readers.append(element.reader)
+
+            self.polled.clear()
+
+            if self.next_batch_readers:
+                return False
+
+            if not self.min_heap:
+                return False
+
+            self.merge_function.reset()
+
+            first_entry = self.min_heap[0]
+            key = first_entry.key
+
+            while self.min_heap and self.user_key_comparator(key, 
self.min_heap[0].key) == 0:
+                entry = heapq.heappop(self.min_heap)
+                self.merge_function.add(entry.element.kv)
+                self.polled.append(entry.element)
+
+            return True
+
+    def release_batch(self):
+        self.released = True
+
+
+class SortMergeReader:
+    def __init__(self, readers, primary_keys):
+        self.next_batch_readers = list(readers)
+        self.merge_function = DeduplicateMergeFunction(False)
+
+        key_columns = [f"_KEY_{pk}" for pk in primary_keys]
+        key_schema = pa.schema([pa.field(column, pa.string()) for column in 
key_columns])
+        self.user_key_comparator = built_comparator(key_schema)
+
+        def element_comparator(e1_tuple, e2_tuple):
+            key1, e1 = e1_tuple
+            key2, e2 = e2_tuple
+
+            result = self.user_key_comparator(key1, key2)
+            if result != 0:
+                return result
+
+            return e1.kv.sequence_number - e2.kv.sequence_number
+
+        from functools import cmp_to_key
+        self.element_key = cmp_to_key(element_comparator)
+
+        self.min_heap = []
+        self.polled = []
+
+    def read_batch(self) -> Optional[RecordIterator]:
+        for reader in self.next_batch_readers:
+            while True:
+                iterator = reader.read_batch()
+                if iterator is None:
+                    reader.close()
+                    break
+
+                kv = iterator.next()
+                if kv is None:
+                    iterator.release_batch()
+                else:
+                    element = Element(kv, iterator, reader)
+                    entry = HeapEntry(kv.key, element, 
self.user_key_comparator)
+                    heapq.heappush(self.min_heap, entry)
+                    break
+
+        self.next_batch_readers.clear()
+
+        if not self.min_heap:
+            return None
+
+        return SortMergeIterator(
+            self,
+            self.polled,
+            self.min_heap,
+            self.merge_function,
+            self.user_key_comparator,
+            self.next_batch_readers
+        )
+
+    def close(self):
+        for reader in self.next_batch_readers:
+            reader.close()
+
+        for entry in self.min_heap:
+            entry.element.iterator.release_batch()
+            entry.element.reader.close()
+
+        for element in self.polled:
+            element.iterator.release_batch()
+            element.reader.close()
diff --git a/pypaimon/py4j/util/constants.py 
b/pypaimon/pynative/tests/__init__.py
similarity index 50%
copy from pypaimon/py4j/util/constants.py
copy to pypaimon/pynative/tests/__init__.py
index f223309..e173487 100644
--- a/pypaimon/py4j/util/constants.py
+++ b/pypaimon/pynative/tests/__init__.py
@@ -16,16 +16,34 @@
 # limitations under the License.
 
################################################################################
 
-# ---------------------------- for env var ----------------------------
-PYPAIMON_CONN_INFO_PATH = '_PYPAIMON_CONN_INFO_PATH'
-PYPAIMON_JVM_ARGS = '_PYPAIMON_JVM_ARGS'
-PYPAIMON_JAVA_CLASSPATH = '_PYPAIMON_JAVA_CLASSPATH'
-PYPAIMON_HADOOP_CLASSPATH = '_PYPAIMON_HADOOP_CLASSPATH'
-PYPAIMON_MAIN_CLASS = 'org.apache.paimon.python.PythonGatewayServer'
-PYPAIMON_MAIN_ARGS = '_PYPAIMON_MAIN_ARGS'
-
-# ------------------------ for catalog options ------------------------
-MAX_WORKERS = "max-workers"
-
-# ------------------ for tests (Please don't use it) ------------------
-PYPAIMON4J_TEST_MODE = '_PYPAIMON4J_TEST_MODE'
+import os
+import shutil
+import tempfile
+import unittest
+
+from pypaimon.py4j import Catalog, constants
+
+
+class PypaimonTestBase(unittest.TestCase):
+    """
+    Base class for unit tests.
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        os.environ[constants.PYPAIMON4J_TEST_MODE] = 'true'
+
+        this_dir = os.path.abspath(os.path.dirname(__file__))
+        project_dir = 
os.path.dirname(os.path.dirname(os.path.dirname(this_dir)))
+        deps = os.path.join(project_dir, "dev/test_deps/*")
+        os.environ[constants.PYPAIMON_HADOOP_CLASSPATH] = deps
+
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = Catalog.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', False)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+        del os.environ[constants.PYPAIMON4J_TEST_MODE]
diff --git a/pypaimon/pynative/tests/test_pynative_reader.py 
b/pypaimon/pynative/tests/test_pynative_reader.py
new file mode 100644
index 0000000..76667a0
--- /dev/null
+++ b/pypaimon/pynative/tests/test_pynative_reader.py
@@ -0,0 +1,308 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pandas as pd
+import pyarrow as pa
+
+from pypaimon import Schema
+from pypaimon.py4j.tests import PypaimonTestBase
+
+
+class NativeReaderTest(PypaimonTestBase):
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+        cls.simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string())
+        ])
+        cls.pk_pa_schema = pa.schema([
+            ('f0', pa.int32(), False),
+            ('f1', pa.string()),
+            ('f2', pa.string())
+        ])
+        cls._expected_full_data = pd.DataFrame({
+            'f0': [1, 2, 3, 4, 5, 6, 7, 8],
+            'f1': ['a', 'b', 'c', None, 'e', 'f', 'g', 'h'],
+            'f2': ['A', 'B', 'C', 'D', 'E', 'F', 'G', None],
+        })
+        cls._expected_full_data['f0'] = 
cls._expected_full_data['f0'].astype('int32')
+        cls.expected_full = pa.Table.from_pandas(cls._expected_full_data,
+                                                 schema=cls.simple_pa_schema)
+        cls._expected_full_data_pk = pd.DataFrame({
+            'f0': [1, 2, 3, 4, 6],
+            'f1': ['a', 'x', 'y', None, 'z'],
+            'f2': ['A', 'X', 'Y', 'D', 'Z'],
+        })
+        cls._expected_full_data_pk['f0'] = 
cls._expected_full_data_pk['f0'].astype('int32')
+        cls.expected_full_pk = pa.Table.from_pandas(cls._expected_full_data_pk,
+                                                    schema=cls.pk_pa_schema)
+
+    def testParquetAppendOnlyReader(self):
+        schema = Schema(self.simple_pa_schema)
+        self.catalog.create_table('default.test_append_only_parquet', schema, 
False)
+        table = self.catalog.get_table('default.test_append_only_parquet')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder)
+        self.assertEqual(actual, self.expected_full)
+
+    def testOrcAppendOnlyReader(self):
+        schema = Schema(self.simple_pa_schema, options={'file.format': 'orc'})
+        self.catalog.create_table('default.test_append_only_orc', schema, 
False)
+        table = self.catalog.get_table('default.test_append_only_orc')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder)
+        self.assertEqual(actual, self.expected_full)
+
+    def testAvroAppendOnlyReader(self):
+        schema = Schema(self.simple_pa_schema, options={'file.format': 'avro'})
+        self.catalog.create_table('default.test_append_only_avro', schema, 
False)
+        table = self.catalog.get_table('default.test_append_only_avro')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder)
+        self.assertEqual(actual, self.expected_full)
+
+    def testAppendOnlyReaderWithFilter(self):
+        schema = Schema(self.simple_pa_schema)
+        self.catalog.create_table('default.test_append_only_filter', schema, 
False)
+        table = self.catalog.get_table('default.test_append_only_filter')
+        self._write_test_table(table)
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+
+        p1 = predicate_builder.less_than('f0', 7)
+        p2 = predicate_builder.greater_or_equal('f0', 2)
+        p3 = predicate_builder.between('f0', 0, 5)  # from now, [2/b, 3/c, 
4/d, 5/e] left
+        p4 = predicate_builder.is_not_in('f1', ['a', 'b'])  # exclude 2/b
+        p5 = predicate_builder.is_in('f2', ['A', 'B', 'D', 'E', 'F', 'G'])  # 
exclude 3/c
+        p6 = predicate_builder.is_not_null('f1')    # exclude 4/d
+        g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
+        read_builder = table.new_read_builder().with_filter(g1)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            self.expected_full.slice(4, 1)  # 5/e
+        ])
+        self.assertEqual(actual, expected)
+
+        p7 = predicate_builder.startswith('f1', 'a')
+        p8 = predicate_builder.endswith('f2', 'C')
+        p9 = predicate_builder.contains('f2', 'E')
+        p10 = predicate_builder.equal('f1', 'f')
+        p11 = predicate_builder.is_null('f2')
+        g2 = predicate_builder.or_predicates([p7, p8, p9, p10, p11])
+        read_builder = table.new_read_builder().with_filter(g2)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            self.expected_full.slice(0, 1),  # 1/a
+            self.expected_full.slice(2, 1),  # 3/c
+            self.expected_full.slice(4, 1),  # 5/e
+            self.expected_full.slice(5, 1),  # 6/f
+            self.expected_full.slice(7, 1),  # 8/h
+        ])
+        self.assertEqual(actual, expected)
+
+        g3 = predicate_builder.and_predicates([g1, g2])
+        read_builder = table.new_read_builder().with_filter(g3)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            self.expected_full.slice(4, 1),  # 5/e
+        ])
+        self.assertEqual(actual, expected)
+
+        # Same as java, 'not_equal' will also filter records of 'None' value
+        p12 = predicate_builder.not_equal('f1', 'f')
+        read_builder = table.new_read_builder().with_filter(p12)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            # not only 6/f, but also 4/d will be filtered
+            self.expected_full.slice(0, 1),  # 1/a
+            self.expected_full.slice(1, 1),  # 2/b
+            self.expected_full.slice(2, 1),  # 3/c
+            self.expected_full.slice(4, 1),  # 5/e
+            self.expected_full.slice(6, 1),  # 7/g
+            self.expected_full.slice(7, 1),  # 8/h
+        ])
+        self.assertEqual(actual, expected)
+
+    def testAppendOnlyReaderWithProjection(self):
+        schema = Schema(self.simple_pa_schema)
+        self.catalog.create_table('default.test_append_only_projection', 
schema, False)
+        table = self.catalog.get_table('default.test_append_only_projection')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder().with_projection(['f0', 'f2'])
+        actual = self._read_test_table(read_builder)
+        expected = self.expected_full.select(['f0', 'f2'])
+        self.assertEqual(actual, expected)
+
+    def testAppendOnlyReaderWithLimit(self):
+        schema = Schema(self.simple_pa_schema, 
options={'source.split.target-size': '1mb'})
+        self.catalog.create_table('default.test_append_only_limit', schema, 
False)
+        table = self.catalog.get_table('default.test_append_only_limit')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder().with_limit(1)
+        actual = self._read_test_table(read_builder)
+        # only records from 1st commit (1st split) will be read
+        expected = pa.concat_tables([
+            self.expected_full.slice(0, 1),  # 1/a
+            self.expected_full.slice(1, 1),  # 2/b
+            self.expected_full.slice(2, 1),  # 3/c
+            self.expected_full.slice(3, 1),  # 4/d
+        ])
+        self.assertEqual(actual, expected)
+
+    # TODO: test cases for avro filter and projection
+
+    def testPkParquetReader(self):
+        schema = Schema(self.pk_pa_schema, primary_keys=['f0'], options={
+            'bucket': '1'
+        })
+        self.catalog.create_table('default.test_pk_parquet', schema, False)
+        table = self.catalog.get_table('default.test_pk_parquet')
+        self._write_test_table(table, for_pk=True)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder)
+        self.assertEqual(actual, self.expected_full_pk)
+
+    def testPkParquetReaderWithMinHeap(self):
+        schema = Schema(self.pk_pa_schema, primary_keys=['f0'], options={
+            'bucket': '1',
+            'sort-engine': 'min-heap'
+        })
+        self.catalog.create_table('default.test_pk_parquet_loser_tree', 
schema, False)
+        table = self.catalog.get_table('default.test_pk_parquet_loser_tree')
+        self._write_test_table(table, for_pk=True)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder)
+        self.assertEqual(actual, self.expected_full_pk)
+
+    def testPkOrcReader(self):
+        schema = Schema(self.pk_pa_schema, primary_keys=['f0'], options={
+            'bucket': '1',
+            'file.format': 'orc'
+        })
+        self.catalog.create_table('default.test_pk_orc', schema, False)
+        table = self.catalog.get_table('default.test_pk_orc')
+        self._write_test_table(table, for_pk=True)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder)
+        self.assertEqual(actual, self.expected_full_pk)
+
+    def testPkAvroReader(self):
+        schema = Schema(self.pk_pa_schema, primary_keys=['f0'], options={
+            'bucket': '1',
+            'file.format': 'avro'
+        })
+        self.catalog.create_table('default.test_pk_avro', schema, False)
+        table = self.catalog.get_table('default.test_pk_avro')
+        self._write_test_table(table, for_pk=True)
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder)
+        self.assertEqual(actual, self.expected_full_pk)
+
+    def testPkReaderWithFilter(self):
+        schema = Schema(self.pk_pa_schema, primary_keys=['f0'], options={
+            'bucket': '1'
+        })
+        self.catalog.create_table('default.test_pk_filter', schema, False)
+        table = self.catalog.get_table('default.test_pk_filter')
+        self._write_test_table(table, for_pk=True)
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+
+        p1 = predicate_builder.between('f0', 0, 5)
+        p2 = predicate_builder.is_not_in('f1', ['a', 'x'])
+        p3 = predicate_builder.is_not_null('f1')
+        g1 = predicate_builder.and_predicates([p1, p2, p3])
+        p4 = predicate_builder.equal('f2', 'Z')
+        g2 = predicate_builder.or_predicates([g1, p4])
+        read_builder = table.new_read_builder().with_filter(g2)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            self.expected_full_pk.slice(2, 1),  # 3/y
+            self.expected_full_pk.slice(4, 1),  # 6/z
+        ])
+        self.assertEqual(actual, expected)
+
+    def testPkReaderWithProjection(self):
+        schema = Schema(self.pk_pa_schema, primary_keys=['f0'], options={
+            'bucket': '1'
+        })
+        self.catalog.create_table('default.test_pk_projection', schema, False)
+        table = self.catalog.get_table('default.test_pk_projection')
+        self._write_test_table(table, for_pk=True)
+
+        read_builder = table.new_read_builder().with_projection(['f0', 'f2'])
+        actual = self._read_test_table(read_builder)
+        expected = self.expected_full_pk.select(['f0', 'f2'])
+        self.assertEqual(actual, expected)
+
+    def _write_test_table(self, table, for_pk=False):
+        write_builder = table.new_batch_write_builder()
+
+        # first write
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = {
+            'f0': [1, 2, 3, 4],
+            'f1': ['a', 'b', 'c', None],
+            'f2': ['A', 'B', 'C', 'D'],
+        }
+        pa_table = pa.Table.from_pydict(data1, schema=self.simple_pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        # second write
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        if for_pk:
+            data2 = {
+                'f0': [2, 3, 6],
+                'f1': ['x', 'y', 'z'],
+                'f2': ['X', 'Y', 'Z'],
+            }
+        else:
+            data2 = {
+                'f0': [5, 6, 7, 8],
+                'f1': ['e', 'f', 'g', 'h'],
+                'f2': ['E', 'F', 'G', None],
+            }
+        pa_table = pa.Table.from_pydict(data2, schema=self.simple_pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+    def _read_test_table(self, read_builder):
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        self.assertNotEqual(table_read.to_record_generator(splits), None)
+        return table_read.to_arrow(splits)
diff --git a/dev/dev-requirements.txt b/pypaimon/pynative/util/__init__.py
old mode 100755
new mode 100644
similarity index 84%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/util/__init__.py
index 4ed964e..65b48d4
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/util/__init__.py
@@ -15,16 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
diff --git a/pypaimon/pynative/util/predicate_converter.py 
b/pypaimon/pynative/util/predicate_converter.py
new file mode 100644
index 0000000..e3c5499
--- /dev/null
+++ b/pypaimon/pynative/util/predicate_converter.py
@@ -0,0 +1,77 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from functools import reduce
+
+import pyarrow as pa
+import pyarrow.compute as pc
+import pyarrow.dataset as ds
+from pyarrow.dataset import Expression
+
+from pypaimon import Predicate
+
+
+def convert_predicate(predicate: Predicate) -> Expression | bool:
+    """
+    # Convert Paimon's Predicate to PyArrow Dataset's filter
+    """
+    if not hasattr(predicate, 'py_predicate'):
+        raise ValueError("Predicate must have py_predicate attribute")
+
+    py_predicate = predicate.py_predicate
+
+    if py_predicate.method == 'equal':
+        return ds.field(py_predicate.field) == py_predicate.literals[0]
+    elif py_predicate.method == 'notEqual':
+        return ds.field(py_predicate.field) != py_predicate.literals[0]
+    elif py_predicate.method == 'lessThan':
+        return ds.field(py_predicate.field) < py_predicate.literals[0]
+    elif py_predicate.method == 'lessOrEqual':
+        return ds.field(py_predicate.field) <= py_predicate.literals[0]
+    elif py_predicate.method == 'greaterThan':
+        return ds.field(py_predicate.field) > py_predicate.literals[0]
+    elif py_predicate.method == 'greaterOrEqual':
+        return ds.field(py_predicate.field) >= py_predicate.literals[0]
+    elif py_predicate.method == 'isNull':
+        return ds.field(py_predicate.field).is_null()
+    elif py_predicate.method == 'isNotNull':
+        return ds.field(py_predicate.field).is_valid()
+    elif py_predicate.method == 'in':
+        return ds.field(py_predicate.field).isin(py_predicate.literals)
+    elif py_predicate.method == 'notIn':
+        return ~ds.field(py_predicate.field).isin(py_predicate.literals)
+    elif py_predicate.method == 'startsWith':
+        pattern = py_predicate.literals[0]
+        return pc.starts_with(ds.field(py_predicate.field).cast(pa.string()), 
pattern)
+    elif py_predicate.method == 'endsWith':
+        pattern = py_predicate.literals[0]
+        return pc.ends_with(ds.field(py_predicate.field).cast(pa.string()), 
pattern)
+    elif py_predicate.method == 'contains':
+        pattern = py_predicate.literals[0]
+        return 
pc.match_substring(ds.field(py_predicate.field).cast(pa.string()), pattern)
+    elif py_predicate.method == 'between':
+        return (ds.field(py_predicate.field) >= py_predicate.literals[0]) & \
+            (ds.field(py_predicate.field) <= py_predicate.literals[1])
+    elif py_predicate.method == 'and':
+        return reduce(lambda x, y: x & y,
+                      [convert_predicate(p) for p in py_predicate.literals])
+    elif py_predicate.method == 'or':
+        return reduce(lambda x, y: x | y,
+                      [convert_predicate(p) for p in py_predicate.literals])
+    else:
+        raise ValueError(f"Unsupported predicate method: 
{py_predicate.method}")
diff --git a/pypaimon/pynative/util/predicate_utils.py 
b/pypaimon/pynative/util/predicate_utils.py
new file mode 100644
index 0000000..8178449
--- /dev/null
+++ b/pypaimon/pynative/util/predicate_utils.py
@@ -0,0 +1,56 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pypaimon.pynative.common.predicate import PyNativePredicate
+
+
+def filter_predicate_by_primary_keys(predicate, primary_keys):
+    """
+    Filter out predicates that are not related to primary key fields.
+    """
+    from pypaimon import Predicate
+
+    if predicate is None or primary_keys is None:
+        return predicate
+
+    py_predicate = predicate.py_predicate
+
+    if py_predicate.method in ['and', 'or']:
+        filtered_literals = []
+        for literal in py_predicate.literals:
+            filtered = filter_predicate_by_primary_keys(literal, primary_keys)
+            if filtered is not None:
+                filtered_literals.append(filtered)
+
+        if not filtered_literals:
+            return None
+
+        if len(filtered_literals) == 1:
+            return filtered_literals[0]
+
+        return Predicate(PyNativePredicate(
+            method=py_predicate.method,
+            index=py_predicate.index,
+            field=py_predicate.field,
+            literals=filtered_literals
+        ), None)
+
+    if py_predicate.field in primary_keys:
+        return predicate
+    else:
+        return None
diff --git a/pypaimon/pynative/util/reader_convert_func.py 
b/pypaimon/pynative/util/reader_convert_func.py
new file mode 100644
index 0000000..00b1a14
--- /dev/null
+++ b/pypaimon/pynative/util/reader_convert_func.py
@@ -0,0 +1,200 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+def create_concat_record_reader(j_reader, converter, predicate, projection, 
primary_keys):
+    from pypaimon.pynative.reader.concat_record_reader import 
ConcatRecordReader
+    reader_class = j_reader.getClass()
+    queue_field = reader_class.getDeclaredField("queue")
+    queue_field.setAccessible(True)
+    j_supplier_queue = queue_field.get(j_reader)
+    return ConcatRecordReader(converter, j_supplier_queue)
+
+
+def create_data_file_record_reader(j_reader, converter, predicate, projection, 
primary_keys):
+    from pypaimon.pynative.reader.data_file_record_reader import 
DataFileRecordReader
+    reader_class = j_reader.getClass()
+    wrapped_reader_field = reader_class.getDeclaredField("reader")
+    wrapped_reader_field.setAccessible(True)
+    j_wrapped_reader = wrapped_reader_field.get(j_reader)
+    wrapped_reader = converter.convert_java_reader(j_wrapped_reader)
+    return DataFileRecordReader(wrapped_reader)
+
+
+def create_filter_reader(j_reader, converter, predicate, projection, 
primary_keys):
+    from pypaimon.pynative.reader.filter_record_reader import 
FilterRecordReader
+    reader_class = j_reader.getClass()
+    wrapped_reader_field = reader_class.getDeclaredField("val$thisReader")
+    wrapped_reader_field.setAccessible(True)
+    j_wrapped_reader = wrapped_reader_field.get(j_reader)
+    wrapped_reader = converter.convert_java_reader(j_wrapped_reader)
+    if primary_keys is not None:
+        return FilterRecordReader(wrapped_reader, predicate)
+    else:
+        return wrapped_reader
+
+
+def create_pyarrow_reader_for_parquet(j_reader, converter, predicate, 
projection, primary_keys):
+    from pypaimon.pynative.reader.pyarrow_dataset_reader import 
PyArrowDatasetReader
+
+    reader_class = j_reader.getClass()
+    factory_field = reader_class.getDeclaredField("this$0")
+    factory_field.setAccessible(True)
+    j_factory = factory_field.get(j_reader)
+    factory_class = j_factory.getClass()
+    batch_size_field = factory_class.getDeclaredField("batchSize")
+    batch_size_field.setAccessible(True)
+    batch_size = batch_size_field.get(j_factory)
+
+    file_reader_field = reader_class.getDeclaredField("reader")
+    file_reader_field.setAccessible(True)
+    j_file_reader = file_reader_field.get(j_reader)
+    file_reader_class = j_file_reader.getClass()
+    input_file_field = file_reader_class.getDeclaredField("file")
+    input_file_field.setAccessible(True)
+    j_input_file = input_file_field.get(j_file_reader)
+    file_path = j_input_file.getPath().toUri().toString()
+
+    return PyArrowDatasetReader('parquet', file_path, batch_size, projection,
+                                predicate, primary_keys)
+
+
+def create_pyarrow_reader_for_orc(j_reader, converter, predicate, projection, 
primary_keys):
+    from pypaimon.pynative.reader.pyarrow_dataset_reader import 
PyArrowDatasetReader
+
+    reader_class = j_reader.getClass()
+    file_reader_field = reader_class.getDeclaredField("orcReader")
+    file_reader_field.setAccessible(True)
+    j_file_reader = file_reader_field.get(j_reader)
+    file_reader_class = j_file_reader.getClass()
+    path_field = file_reader_class.getDeclaredField("path")
+    path_field.setAccessible(True)
+    j_path = path_field.get(j_file_reader)
+    file_path = j_path.toUri().toString()
+
+    # TODO: Temporarily hard-coded to 1024 as we cannot reflectively obtain 
this value yet
+    batch_size = 1024
+
+    return PyArrowDatasetReader('orc', file_path, batch_size, projection, 
predicate, primary_keys)
+
+
+def create_avro_format_reader(j_reader, converter, predicate, projection, 
primary_keys):
+    from pypaimon.pynative.reader.avro_format_reader import AvroFormatReader
+
+    reader_class = j_reader.getClass()
+    path_field = reader_class.getDeclaredField("filePath")
+    path_field.setAccessible(True)
+    j_path = path_field.get(j_reader)
+    file_path = j_path.toUri().toString()
+
+    # TODO: Temporarily hard-coded to 1024 as we cannot reflectively obtain 
this value yet
+    batch_size = 1024
+
+    return AvroFormatReader(file_path, batch_size, None)
+
+
+def create_key_value_unwrap_reader(j_reader, converter, predicate, projection, 
primary_keys):
+    from pypaimon.pynative.reader.key_value_unwrap_reader import 
KeyValueUnwrapReader
+    reader_class = j_reader.getClass()
+    wrapped_reader_field = reader_class.getDeclaredField("val$reader")
+    wrapped_reader_field.setAccessible(True)
+    j_wrapped_reader = wrapped_reader_field.get(j_reader)
+    wrapped_reader = converter.convert_java_reader(j_wrapped_reader)
+    return KeyValueUnwrapReader(wrapped_reader)
+
+
+def create_transform_reader(j_reader, converter, predicate, projection, 
primary_keys):
+    reader_class = j_reader.getClass()
+    wrapped_reader_field = reader_class.getDeclaredField("val$thisReader")
+    wrapped_reader_field.setAccessible(True)
+    j_wrapped_reader = wrapped_reader_field.get(j_reader)
+    # TODO: implement projectKey and projectOuter
+    return converter.convert_java_reader(j_wrapped_reader)
+
+
+def create_drop_delete_reader(j_reader, converter, predicate, projection, 
primary_keys):
+    from pypaimon.pynative.reader.drop_delete_reader import DropDeleteReader
+    reader_class = j_reader.getClass()
+    wrapped_reader_field = reader_class.getDeclaredField("reader")
+    wrapped_reader_field.setAccessible(True)
+    j_wrapped_reader = wrapped_reader_field.get(j_reader)
+    wrapped_reader = converter.convert_java_reader(j_wrapped_reader)
+    return DropDeleteReader(wrapped_reader)
+
+
+def create_sort_merge_reader_minhep(j_reader, converter, predicate, 
projection, primary_keys):
+    from pypaimon.pynative.reader.sort_merge_reader import SortMergeReader
+    j_reader_class = j_reader.getClass()
+    batch_readers_field = j_reader_class.getDeclaredField("nextBatchReaders")
+    batch_readers_field.setAccessible(True)
+    j_batch_readers = batch_readers_field.get(j_reader)
+    readers = []
+    for next_reader in j_batch_readers:
+        readers.append(converter.convert_java_reader(next_reader))
+    return SortMergeReader(readers, primary_keys)
+
+
+def create_sort_merge_reader_loser_tree(j_reader, converter, predicate, 
projection, primary_keys):
+    from pypaimon.pynative.reader.sort_merge_reader import SortMergeReader
+    j_reader_class = j_reader.getClass()
+    loser_tree_field = j_reader_class.getDeclaredField("loserTree")
+    loser_tree_field.setAccessible(True)
+    j_loser_tree = loser_tree_field.get(j_reader)
+    j_loser_tree_class = j_loser_tree.getClass()
+    leaves_field = j_loser_tree_class.getDeclaredField("leaves")
+    leaves_field.setAccessible(True)
+    j_leaves = leaves_field.get(j_loser_tree)
+    readers = []
+    for j_leaf in j_leaves:
+        j_leaf_class = j_leaf.getClass()
+        j_leaf_reader_field = j_leaf_class.getDeclaredField("reader")
+        j_leaf_reader_field.setAccessible(True)
+        j_leaf_reader = j_leaf_reader_field.get(j_leaf)
+        readers.append(converter.convert_java_reader(j_leaf_reader))
+    return SortMergeReader(readers, primary_keys)
+
+
+def create_key_value_wrap_record_reader(j_reader, converter, predicate, 
projection, primary_keys):
+    from pypaimon.pynative.reader.key_value_wrap_reader import 
KeyValueWrapReader
+    reader_class = j_reader.getClass()
+
+    wrapped_reader_field = reader_class.getDeclaredField("reader")
+    wrapped_reader_field.setAccessible(True)
+    j_wrapped_reader = wrapped_reader_field.get(j_reader)
+    wrapped_reader = converter.convert_java_reader(j_wrapped_reader)
+
+    level_field = reader_class.getDeclaredField("level")
+    level_field.setAccessible(True)
+    level = level_field.get(j_reader)
+
+    serializer_field = reader_class.getDeclaredField("serializer")
+    serializer_field.setAccessible(True)
+    j_serializer = serializer_field.get(j_reader)
+    serializer_class = j_serializer.getClass()
+    key_arity_field = serializer_class.getDeclaredField("keyArity")
+    key_arity_field.setAccessible(True)
+    key_arity = key_arity_field.get(j_serializer)
+
+    reused_value_field = serializer_class.getDeclaredField("reusedValue")
+    reused_value_field.setAccessible(True)
+    j_reused_value = reused_value_field.get(j_serializer)
+    offset_row_class = j_reused_value.getClass()
+    arity_field = offset_row_class.getDeclaredField("arity")
+    arity_field.setAccessible(True)
+    value_arity = arity_field.get(j_reused_value)
+    return KeyValueWrapReader(wrapped_reader, level, key_arity, value_arity)
diff --git a/pypaimon/pynative/util/reader_converter.py 
b/pypaimon/pynative/util/reader_converter.py
new file mode 100644
index 0000000..ef9bbb0
--- /dev/null
+++ b/pypaimon/pynative/util/reader_converter.py
@@ -0,0 +1,89 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+from typing import List
+
+from py4j.java_gateway import JavaObject
+
+from pypaimon.py4j.util import constants
+from pypaimon.pynative.common.exception import PyNativeNotImplementedError
+from pypaimon.pynative.reader.core.record_reader import RecordReader
+from pypaimon.pynative.util.reader_convert_func import (
+    create_avro_format_reader,
+    create_concat_record_reader,
+    create_data_file_record_reader,
+    create_drop_delete_reader,
+    create_filter_reader,
+    create_key_value_unwrap_reader,
+    create_key_value_wrap_record_reader,
+    create_pyarrow_reader_for_orc,
+    create_pyarrow_reader_for_parquet,
+    create_sort_merge_reader_minhep,
+    create_transform_reader, create_sort_merge_reader_loser_tree,
+)
+
+reader_mapping = {
+    "org.apache.paimon.mergetree.compact.ConcatRecordReader":
+        create_concat_record_reader,
+    "org.apache.paimon.io.DataFileRecordReader":
+        create_data_file_record_reader,
+    "org.apache.paimon.reader.RecordReader$2":
+        create_filter_reader,
+    "org.apache.paimon.format.parquet.ParquetReaderFactory$ParquetReader":
+        create_pyarrow_reader_for_parquet,
+    "org.apache.paimon.format.orc.OrcReaderFactory$OrcVectorizedReader":
+        create_pyarrow_reader_for_orc,
+    "org.apache.paimon.format.avro.AvroBulkFormat$AvroReader":
+        create_avro_format_reader,
+    "org.apache.paimon.table.source.KeyValueTableRead$1":
+        create_key_value_unwrap_reader,
+    "org.apache.paimon.reader.RecordReader$1":
+        create_transform_reader,
+    "org.apache.paimon.mergetree.DropDeleteReader":
+        create_drop_delete_reader,
+    "org.apache.paimon.mergetree.compact.SortMergeReaderWithMinHeap":
+        create_sort_merge_reader_minhep,
+    "org.apache.paimon.mergetree.compact.SortMergeReaderWithLoserTree":
+        create_sort_merge_reader_loser_tree,
+    "org.apache.paimon.io.KeyValueDataFileRecordReader":
+        create_key_value_wrap_record_reader,
+    # Additional mappings can be added here
+}
+
+
+class ReaderConverter:
+    """
+    # Convert Java RecordReader to Python RecordReader
+    """
+
+    def __init__(self, predicate, projection, primary_keys: List[str]):
+        self.reader_mapping = reader_mapping
+        self._predicate = predicate
+        self._projection = projection
+        self._primary_keys = primary_keys
+
+    def convert_java_reader(self, java_reader: JavaObject) -> RecordReader:
+        java_class_name = java_reader.getClass().getName()
+        if java_class_name in reader_mapping:
+            if os.environ.get(constants.PYPAIMON4J_TEST_MODE) == "true":
+                print("converting Java reader: " + str(java_class_name))
+            return reader_mapping[java_class_name](java_reader, self, 
self._predicate,
+                                                   self._projection, 
self._primary_keys)
+        else:
+            raise PyNativeNotImplementedError(f"Unsupported RecordReader type: 
{java_class_name}")
diff --git a/dev/dev-requirements.txt b/pypaimon/pynative/writer/__init__.py
old mode 100755
new mode 100644
similarity index 84%
copy from dev/dev-requirements.txt
copy to pypaimon/pynative/writer/__init__.py
index 4ed964e..65b48d4
--- a/dev/dev-requirements.txt
+++ b/pypaimon/pynative/writer/__init__.py
@@ -15,16 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
-pip>=20.3
-setuptools>=18.0
-wheel
-py4j==0.10.9.7
-pyarrow>=5.0.0
-pandas>=1.3.0
-numpy>=1.22.4
-python-dateutil>=2.8.0,<3
-pytz>=2018.3
-pytest~=7.0
-duckdb>=0.5.0,<2.0.0
-ray~=2.10.0
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..b9481f5
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,8 @@
+[tool.isort]
+profile = "black"
+multi_line_output = 3
+include_trailing_comma = true
+force_grid_wrap = 0
+use_parentheses = true
+ensure_newline_before_comments = true
+line_length = 88
diff --git a/setup.py b/setup.py
index 98515e0..0762a69 100644
--- a/setup.py
+++ b/setup.py
@@ -66,6 +66,12 @@ setup(
         "pypaimon.hadoop-deps": ["*.jar"]
     },
     install_requires=install_requires,
+    extras_require={
+        'avro': [
+            'fastavro>=1.9.0',
+            'zstandard>=0.23.0'
+        ]
+    },
     description='Apache Paimon Python API',
     long_description=long_description,
     long_description_content_type='text/markdown',

Reply via email to