This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 546e43e7c0 [python] support read meta field when enable row-tracking. 
(#6819)
546e43e7c0 is described below

commit 546e43e7c010d78eb7a6c2d4da9fac364757a5ff
Author: zhoulii <[email protected]>
AuthorDate: Tue Dec 16 14:51:54 2025 +0800

    [python] support read meta field when enable row-tracking. (#6819)
---
 paimon-python/pypaimon/read/read_builder.py        |  6 +-
 .../pypaimon/read/reader/data_file_batch_reader.py | 38 +++++++++-
 .../pypaimon/read/reader/format_pyarrow_reader.py  | 37 +++++++++-
 paimon-python/pypaimon/read/split_read.py          | 86 ++++++++++++++++------
 paimon-python/pypaimon/read/table_read.py          | 14 ++--
 paimon-python/pypaimon/table/special_fields.py     | 83 +++++++++++++++++++++
 .../pypaimon/tests/data_evolution_test.py          | 77 ++++++++++++++++++-
 7 files changed, 308 insertions(+), 33 deletions(-)

diff --git a/paimon-python/pypaimon/read/read_builder.py 
b/paimon-python/pypaimon/read/read_builder.py
index 6fc8026c43..c33982c9ac 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -18,11 +18,13 @@
 
 from typing import List, Optional
 
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
 from pypaimon.common.predicate_builder import PredicateBuilder
 from pypaimon.read.table_read import TableRead
 from pypaimon.read.table_scan import TableScan
 from pypaimon.schema.data_types import DataField
+from pypaimon.table.special_fields import SpecialFields
 
 
 class ReadBuilder:
@@ -71,5 +73,7 @@ class ReadBuilder:
         if not self._projection:
             return table_fields
         else:
-            field_map = {field.name: field for field in self.table.fields}
+            if self.table.options.get(CoreOptions.ROW_TRACKING_ENABLED, 
'false').lower() == 'true':
+                table_fields = 
SpecialFields.row_type_with_row_tracking(table_fields)
+            field_map = {field.name: field for field in table_fields}
             return [field_map[name] for name in self._projection if name in 
field_map]
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index c83f1ce152..526e501b97 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -24,6 +24,7 @@ from pyarrow import RecordBatch
 from pypaimon.read.partition_info import PartitionInfo
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.schema.data_types import DataField, PyarrowFieldParser
+from pypaimon.table.special_fields import SpecialFields
 
 
 class DataFileBatchReader(RecordBatchReader):
@@ -32,12 +33,20 @@ class DataFileBatchReader(RecordBatchReader):
     """
 
     def __init__(self, format_reader: RecordBatchReader, index_mapping: 
List[int], partition_info: PartitionInfo,
-                 system_primary_key: Optional[List[str]], fields: 
List[DataField]):
+                 system_primary_key: Optional[List[str]], fields: 
List[DataField],
+                 max_sequence_number: int,
+                 first_row_id: int,
+                 row_tracking_enabled: bool,
+                 system_fields: dict):
         self.format_reader = format_reader
         self.index_mapping = index_mapping
         self.partition_info = partition_info
         self.system_primary_key = system_primary_key
         self.schema_map = {field.name: field for field in 
PyarrowFieldParser.from_paimon_schema(fields)}
+        self.row_tracking_enabled = row_tracking_enabled
+        self.first_row_id = first_row_id
+        self.max_sequence_number = max_sequence_number
+        self.system_fields = system_fields
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         record_batch = self.format_reader.read_arrow_batch()
@@ -45,6 +54,8 @@ class DataFileBatchReader(RecordBatchReader):
             return None
 
         if self.partition_info is None and self.index_mapping is None:
+            if self.row_tracking_enabled and self.system_fields:
+                record_batch = self._assign_row_tracking(record_batch)
             return record_batch
 
         inter_arrays = []
@@ -96,8 +107,31 @@ class DataFileBatchReader(RecordBatchReader):
                 target_field = pa.field(name, array.type)
             final_fields.append(target_field)
         final_schema = pa.schema(final_fields)
+        record_batch = pa.RecordBatch.from_arrays(inter_arrays, 
schema=final_schema)
 
-        return pa.RecordBatch.from_arrays(inter_arrays, schema=final_schema)
+        # Handle row tracking fields
+        if self.row_tracking_enabled and self.system_fields:
+            record_batch = self._assign_row_tracking(record_batch)
+
+        return record_batch
+
+    def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
+        """Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER)."""
+        arrays = list(record_batch.columns)
+
+        # Handle _ROW_ID field
+        if SpecialFields.ROW_ID.name in self.system_fields.keys():
+            idx = self.system_fields[SpecialFields.ROW_ID.name]
+            # Create a new array that fills with computed row IDs
+            arrays[idx] = pa.array(range(self.first_row_id, self.first_row_id 
+ record_batch.num_rows), type=pa.int64())
+
+        # Handle _SEQUENCE_NUMBER field
+        if SpecialFields.SEQUENCE_NUMBER.name in self.system_fields.keys():
+            idx = self.system_fields[SpecialFields.SEQUENCE_NUMBER.name]
+            # Create a new array that fills with max_sequence_number
+            arrays[idx] = pa.repeat(self.max_sequence_number, 
record_batch.num_rows)
+
+        return pa.RecordBatch.from_arrays(arrays, 
names=record_batch.schema.names)
 
     def close(self) -> None:
         self.format_reader.close()
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py 
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index 4d2f1f4c0f..ed560d14a4 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -16,8 +16,9 @@
 # limitations under the License.
 
################################################################################
 
-from typing import List, Optional, Any
+from typing import Any, List, Optional
 
+import pyarrow as pa
 import pyarrow.dataset as ds
 from pyarrow import RecordBatch
 
@@ -35,15 +36,45 @@ class FormatPyArrowReader(RecordBatchReader):
                  push_down_predicate: Any, batch_size: int = 4096):
         file_path_for_pyarrow = file_io.to_filesystem_path(file_path)
         self.dataset = ds.dataset(file_path_for_pyarrow, format=file_format, 
filesystem=file_io.filesystem)
+        self.read_fields = read_fields
+
+        # Identify which fields exist in the file and which are missing
+        file_schema_names = set(self.dataset.schema.names)
+        self.existing_fields = [field for field in read_fields if field in 
file_schema_names]
+        self.missing_fields = [field for field in read_fields if field not in 
file_schema_names]
+
+        # Only pass existing fields to PyArrow scanner to avoid errors
         self.reader = self.dataset.scanner(
-            columns=read_fields,
+            columns=self.existing_fields,
             filter=push_down_predicate,
             batch_size=batch_size
         ).to_reader()
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         try:
-            return self.reader.read_next_batch()
+            batch = self.reader.read_next_batch()
+
+            if not self.missing_fields:
+                return batch
+
+            # Create columns for missing fields with null values
+            missing_columns = [pa.nulls(batch.num_rows, type=pa.null()) for _ 
in self.missing_fields]
+
+            # Reconstruct the batch with all fields in the correct order
+            all_columns = []
+            for field_name in self.read_fields:
+                if field_name in self.existing_fields:
+                    # Get the column from the existing batch
+                    column_idx = self.existing_fields.index(field_name)
+                    all_columns.append(batch.column(column_idx))
+                else:
+                    # Get the column from missing fields
+                    column_idx = self.missing_fields.index(field_name)
+                    all_columns.append(missing_columns[column_idx])
+
+            # Create a new RecordBatch with all columns
+            return pa.RecordBatch.from_arrays(all_columns, 
names=self.read_fields)
+
         except StopIteration:
             return None
 
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index ed579a2a20..88dba9498f 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,7 +19,7 @@
 import os
 from abc import ABC, abstractmethod
 from functools import partial
-from typing import List, Optional, Tuple, Callable
+from typing import Callable, List, Optional, Tuple
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
@@ -29,26 +29,31 @@ from pypaimon.manifest.schema.data_file_meta import 
DataFileMeta
 from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.partition_info import PartitionInfo
 from pypaimon.read.push_down_utils import trim_predicate_by_fields
-from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader, 
ShardBatchReader, MergeAllBatchReader
+from pypaimon.read.reader.concat_batch_reader import (ConcatBatchReader,
+                                                      MergeAllBatchReader,
+                                                      ShardBatchReader)
 from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
+from pypaimon.read.reader.data_evolution_merge_reader import \
+    DataEvolutionMergeReader
 from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
-from pypaimon.read.reader.data_evolution_merge_reader import 
DataEvolutionMergeReader
-from pypaimon.read.reader.field_bunch import FieldBunch, DataBunch, BlobBunch
 from pypaimon.read.reader.drop_delete_reader import DropDeleteRecordReader
 from pypaimon.read.reader.empty_record_reader import EmptyFileRecordReader
+from pypaimon.read.reader.field_bunch import BlobBunch, DataBunch, FieldBunch
 from pypaimon.read.reader.filter_record_reader import FilterRecordReader
 from pypaimon.read.reader.format_avro_reader import FormatAvroReader
 from pypaimon.read.reader.format_blob_reader import FormatBlobReader
 from pypaimon.read.reader.format_lance_reader import FormatLanceReader
 from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
-from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader, 
RowPositionReader
+from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader,
+                                                            RowPositionReader)
 from pypaimon.read.reader.iface.record_reader import RecordReader
 from pypaimon.read.reader.key_value_unwrap_reader import \
     KeyValueUnwrapRecordReader
 from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader
 from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
 from pypaimon.read.split import Split
-from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.special_fields import SpecialFields
 
 KEY_PREFIX = "_KEY_"
 KEY_FIELD_ID_START = 1000000
@@ -58,13 +63,20 @@ NULL_FIELD_INDEX = -1
 class SplitRead(ABC):
     """Abstract base class for split reading operations."""
 
-    def __init__(self, table, predicate: Optional[Predicate], read_type: 
List[DataField], split: Split):
+    def __init__(
+            self,
+            table,
+            predicate: Optional[Predicate],
+            read_type: List[DataField],
+            split: Split,
+            row_tracking_enabled: bool):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
         self.predicate = predicate
         self.push_down_predicate = self._push_down_predicate()
         self.split = split
+        self.row_tracking_enabled = row_tracking_enabled
         self.value_arity = len(read_type)
 
         self.trimmed_primary_key = self.table.trimmed_primary_keys
@@ -90,7 +102,7 @@ class SplitRead(ABC):
         """Create a record reader for the given split."""
 
     def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
-                             read_fields: List[str]) -> RecordBatchReader:
+                             read_fields: List[str], row_tracking_enabled: 
bool) -> RecordBatchReader:
         (read_file_fields, read_arrow_predicate) = 
self._get_fields_and_predicate(file.schema_id, read_fields)
 
         # Use external_path if available, otherwise use file_path
@@ -117,18 +129,43 @@ class SplitRead(ABC):
 
         index_mapping = self.create_index_mapping()
         partition_info = self._create_partition_info()
+        system_fields = SpecialFields.find_system_fields(self.read_fields)
+        table_schema_fields = (
+            
SpecialFields.row_type_with_row_tracking(self.table.table_schema.fields)
+            if row_tracking_enabled else self.table.table_schema.fields
+        )
         if for_merge_read:
-            return DataFileBatchReader(format_reader, index_mapping, 
partition_info, self.trimmed_primary_key,
-                                       self.table.table_schema.fields)
+            return DataFileBatchReader(
+                format_reader,
+                index_mapping,
+                partition_info,
+                self.trimmed_primary_key,
+                table_schema_fields,
+                file.max_sequence_number,
+                file.first_row_id,
+                row_tracking_enabled,
+                system_fields)
         else:
-            return DataFileBatchReader(format_reader, index_mapping, 
partition_info, None,
-                                       self.table.table_schema.fields)
+            return DataFileBatchReader(
+                format_reader,
+                index_mapping,
+                partition_info,
+                None,
+                table_schema_fields,
+                file.max_sequence_number,
+                file.first_row_id,
+                row_tracking_enabled,
+                system_fields)
 
     def _get_fields_and_predicate(self, schema_id: int, read_fields):
         key = (schema_id, tuple(read_fields))
         if key not in self.schema_id_2_fields:
             schema = self.table.schema_manager.get_schema(schema_id)
-            schema_field_names = set(field.name for field in schema.fields)
+            schema_fields = (
+                SpecialFields.row_type_with_row_tracking(schema.fields)
+                if self.row_tracking_enabled else schema.fields
+            )
+            schema_field_names = set(field.name for field in schema_fields)
             if self.table.is_primary_key_table:
                 schema_field_names.add('_SEQUENCE_NUMBER')
                 schema_field_names.add('_VALUE_KIND')
@@ -161,10 +198,8 @@ class SplitRead(ABC):
                 key_field = DataField(key_field_id, key_field_name, field.type)
                 all_data_fields.append(key_field)
 
-        sequence_field = DataField(2147483646, "_SEQUENCE_NUMBER", 
AtomicType("BIGINT", nullable=False))
-        all_data_fields.append(sequence_field)
-        value_kind_field = DataField(2147483645, "_VALUE_KIND", 
AtomicType("TINYINT", nullable=False))
-        all_data_fields.append(value_kind_field)
+        all_data_fields.append(SpecialFields.SEQUENCE_NUMBER)
+        all_data_fields.append(SpecialFields.VALUE_KIND)
 
         for field in value_field:
             all_data_fields.append(field)
@@ -297,7 +332,8 @@ class SplitRead(ABC):
 
 class RawFileSplitRead(SplitRead):
     def raw_reader_supplier(self, file: DataFileMeta, dv_factory: 
Optional[Callable] = None) -> RecordReader:
-        file_batch_reader = self.file_reader_supplier(file, False, 
self._get_final_read_data_fields())
+        file_batch_reader = self.file_reader_supplier(
+            file, False, self._get_final_read_data_fields(), 
self.row_tracking_enabled)
         dv = dv_factory() if dv_factory else None
         if dv:
             return 
ApplyDeletionVectorReader(RowPositionReader(file_batch_reader), dv)
@@ -328,12 +364,14 @@ class RawFileSplitRead(SplitRead):
             return concat_reader
 
     def _get_all_data_fields(self):
+        if self.row_tracking_enabled:
+            return SpecialFields.row_type_with_row_tracking(self.table.fields)
         return self.table.fields
 
 
 class MergeFileSplitRead(SplitRead):
     def kv_reader_supplier(self, file: DataFileMeta, dv_factory: 
Optional[Callable] = None) -> RecordReader:
-        file_batch_reader = self.file_reader_supplier(file, True, 
self._get_final_read_data_fields())
+        file_batch_reader = self.file_reader_supplier(file, True, 
self._get_final_read_data_fields(), False)
         dv = dv_factory() if dv_factory else None
         if dv:
             return ApplyDeletionVectorReader(
@@ -517,7 +555,11 @@ class DataEvolutionSplitRead(SplitRead):
 
     def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> 
RecordReader:
         """Create a file reader for a single file."""
-        return self.file_reader_supplier(file=file, for_merge_read=False, 
read_fields=read_fields)
+        return self.file_reader_supplier(
+            file=file,
+            for_merge_read=False,
+            read_fields=read_fields,
+            row_tracking_enabled=True)
 
     def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> 
List[FieldBunch]:
         """Split files into field bunches."""
@@ -558,6 +600,8 @@ class DataEvolutionSplitRead(SplitRead):
             for field in self.table.fields:
                 if field.name == field_name:
                     field_ids.append(field.id)
+        field_ids.append(SpecialFields.ROW_ID.id)
+        field_ids.append(SpecialFields.SEQUENCE_NUMBER.id)
         return field_ids
 
     @staticmethod
@@ -566,4 +610,4 @@ class DataEvolutionSplitRead(SplitRead):
         return file_name.endswith('.blob')
 
     def _get_all_data_fields(self):
-        return self.table.fields
+        return SpecialFields.row_type_with_row_tracking(self.table.fields)
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index c98b932059..ee6981549f 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -24,8 +24,9 @@ from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.read.split import Split
-from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead,
-                                      SplitRead, DataEvolutionSplitRead)
+from pypaimon.read.split_read import (DataEvolutionSplitRead,
+                                      MergeFileSplitRead, RawFileSplitRead,
+                                      SplitRead)
 from pypaimon.schema.data_types import DataField, PyarrowFieldParser
 from pypaimon.table.row.offset_row import OffsetRow
 
@@ -159,21 +160,24 @@ class TableRead:
                 table=self.table,
                 predicate=self.predicate,
                 read_type=self.read_type,
-                split=split
+                split=split,
+                row_tracking_enabled=False
             )
         elif self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 
'false').lower() == 'true':
             return DataEvolutionSplitRead(
                 table=self.table,
                 predicate=self.predicate,
                 read_type=self.read_type,
-                split=split
+                split=split,
+                row_tracking_enabled=True
             )
         else:
             return RawFileSplitRead(
                 table=self.table,
                 predicate=self.predicate,
                 read_type=self.read_type,
-                split=split
+                split=split,
+                
row_tracking_enabled=self.table.options.get(CoreOptions.ROW_TRACKING_ENABLED, 
'false').lower() == 'true'
             )
 
     @staticmethod
diff --git a/paimon-python/pypaimon/table/special_fields.py 
b/paimon-python/pypaimon/table/special_fields.py
new file mode 100644
index 0000000000..6d6a2e7fec
--- /dev/null
+++ b/paimon-python/pypaimon/table/special_fields.py
@@ -0,0 +1,83 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from typing import List
+
+from ..schema.data_types import AtomicType, DataField
+
+
+class SpecialFields:
+    """
+    Special fields in a RowType with specific field ids.
+    """
+
+    SEQUENCE_NUMBER = DataField(2147483646, "_SEQUENCE_NUMBER", 
AtomicType("BIGINT", nullable=False))
+    VALUE_KIND = DataField(2147483645, "_VALUE_KIND", AtomicType("TINYINT", 
nullable=False))
+    ROW_ID = DataField(2147483642, "_ROW_ID", AtomicType("BIGINT", 
nullable=False))
+
+    SYSTEM_FIELD_NAMES = {
+        '_SEQUENCE_NUMBER',
+        '_VALUE_KIND',
+        '_ROW_ID'
+    }
+
+    @staticmethod
+    def is_system_field(field_name: str) -> bool:
+        """Check if a field is a system field."""
+        return field_name in SpecialFields.SYSTEM_FIELD_NAMES
+
+    @staticmethod
+    def find_system_fields(read_fields: List[DataField]) -> dict:
+        """Find system fields in read fields and return a mapping of field 
name to index."""
+        system_fields = {}
+        for i, field in enumerate(read_fields):
+            if SpecialFields.is_system_field(field.name):
+                system_fields[field.name] = i
+        return system_fields
+
+    @staticmethod
+    def row_type_with_row_tracking(table_fields: List[DataField],
+                                   sequence_number_nullable: bool = False) -> 
List[DataField]:
+        """
+        Add row tracking fields.
+
+        Args:
+            table_fields: The original table fields
+            sequence_number_nullable: Whether sequence number should be 
nullable
+        """
+        fields_with_row_tracking = list(table_fields)
+
+        for field in fields_with_row_tracking:
+            if (SpecialFields.ROW_ID.name == field.name
+                    or SpecialFields.SEQUENCE_NUMBER.name == field.name):
+                raise ValueError(
+                    f"Row tracking field name '{field.name}' conflicts with 
existing field names."
+                )
+
+        fields_with_row_tracking.append(SpecialFields.ROW_ID)
+
+        if sequence_number_nullable:
+            seq_num_field = DataField(
+                id=SpecialFields.SEQUENCE_NUMBER.id,
+                name=SpecialFields.SEQUENCE_NUMBER.name,
+                type=AtomicType("BIGINT", nullable=True)  # Make it nullable
+            )
+            fields_with_row_tracking.append(seq_num_field)
+        else:
+            fields_with_row_tracking.append(SpecialFields.SEQUENCE_NUMBER)
+
+        return fields_with_row_tracking
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 90abd2f916..896d534f09 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -20,7 +20,8 @@ import tempfile
 import unittest
 
 import pyarrow as pa
-from pypaimon import Schema, CatalogFactory
+
+from pypaimon import CatalogFactory, Schema
 
 
 class DataEvolutionTest(unittest.TestCase):
@@ -481,3 +482,77 @@ class DataEvolutionTest(unittest.TestCase):
             'f2': [f'c{i}' for i in range(size)],
         }, schema=simple_pa_schema)
         self.assertEqual(actual, expect)
+
+    def test_read_row_tracking_metadata(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int8()),
+            ('f1', pa.int16()),
+        ])
+        schema = Schema.from_pyarrow_schema(simple_pa_schema,
+                                            options={'row-tracking.enabled': 
'true', 'data-evolution.enabled': 'true'})
+        self.catalog.create_table('default.test_row_tracking_meta', schema, 
False)
+        table = self.catalog.get_table('default.test_row_tracking_meta')
+
+        # write 1
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        expect_data = pa.Table.from_pydict({
+            'f0': [-1, 2],
+            'f1': [-1001, 1002]
+        }, schema=simple_pa_schema)
+        table_write.write_arrow(expect_data)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        read_builder.with_projection(['f0', '_ROW_ID', 'f1', 
'_SEQUENCE_NUMBER'])
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_data = table_read.to_arrow(table_scan.plan().splits())
+        expect_data = pa.Table.from_pydict({
+            'f0': [-1, 2],
+            '_ROW_ID': [0, 1],
+            'f1': [-1001, 1002],
+            '_SEQUENCE_NUMBER': [1, 1],
+        }, schema=pa.schema([
+            ('f0', pa.int8()),
+            ('_ROW_ID', pa.int64()),
+            ('f1', pa.int16()),
+            ('_SEQUENCE_NUMBER', pa.int64()),
+        ]))
+        self.assertEqual(actual_data, expect_data)
+
+        # write 2
+        table_write = write_builder.new_write().with_write_type(['f0'])
+        table_commit = write_builder.new_commit()
+        data2 = pa.Table.from_pydict({
+            'f0': [3, 4],
+        }, schema=pa.schema([
+            ('f0', pa.int8()),
+        ]))
+        table_write.write_arrow(data2)
+        cmts = table_write.prepare_commit()
+        cmts[0].new_files[0].first_row_id = 0
+        table_commit.commit(cmts)
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        read_builder.with_projection(['f0', 'f1', '_ROW_ID', 
'_SEQUENCE_NUMBER'])
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_data = table_read.to_arrow(table_scan.plan().splits())
+        expect_data = pa.Table.from_pydict({
+            'f0': [3, 4],
+            'f1': [-1001, 1002],
+            '_ROW_ID': [0, 1],
+            '_SEQUENCE_NUMBER': [2, 2],
+        }, schema=pa.schema([
+            ('f0', pa.int8()),
+            ('f1', pa.int16()),
+            ('_ROW_ID', pa.int64()),
+            ('_SEQUENCE_NUMBER', pa.int64()),
+        ]))
+        self.assertEqual(actual_data, expect_data)

Reply via email to