This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 546e43e7c0 [python] support read meta field when enable row-tracking.
(#6819)
546e43e7c0 is described below
commit 546e43e7c010d78eb7a6c2d4da9fac364757a5ff
Author: zhoulii <[email protected]>
AuthorDate: Tue Dec 16 14:51:54 2025 +0800
[python] support read meta field when enable row-tracking. (#6819)
---
paimon-python/pypaimon/read/read_builder.py | 6 +-
.../pypaimon/read/reader/data_file_batch_reader.py | 38 +++++++++-
.../pypaimon/read/reader/format_pyarrow_reader.py | 37 +++++++++-
paimon-python/pypaimon/read/split_read.py | 86 ++++++++++++++++------
paimon-python/pypaimon/read/table_read.py | 14 ++--
paimon-python/pypaimon/table/special_fields.py | 83 +++++++++++++++++++++
.../pypaimon/tests/data_evolution_test.py | 77 ++++++++++++++++++-
7 files changed, 308 insertions(+), 33 deletions(-)
diff --git a/paimon-python/pypaimon/read/read_builder.py
b/paimon-python/pypaimon/read/read_builder.py
index 6fc8026c43..c33982c9ac 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -18,11 +18,13 @@
from typing import List, Optional
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.common.predicate_builder import PredicateBuilder
from pypaimon.read.table_read import TableRead
from pypaimon.read.table_scan import TableScan
from pypaimon.schema.data_types import DataField
+from pypaimon.table.special_fields import SpecialFields
class ReadBuilder:
@@ -71,5 +73,7 @@ class ReadBuilder:
if not self._projection:
return table_fields
else:
- field_map = {field.name: field for field in self.table.fields}
+ if self.table.options.get(CoreOptions.ROW_TRACKING_ENABLED,
'false').lower() == 'true':
+ table_fields =
SpecialFields.row_type_with_row_tracking(table_fields)
+ field_map = {field.name: field for field in table_fields}
return [field_map[name] for name in self._projection if name in
field_map]
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index c83f1ce152..526e501b97 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -24,6 +24,7 @@ from pyarrow import RecordBatch
from pypaimon.read.partition_info import PartitionInfo
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
+from pypaimon.table.special_fields import SpecialFields
class DataFileBatchReader(RecordBatchReader):
@@ -32,12 +33,20 @@ class DataFileBatchReader(RecordBatchReader):
"""
def __init__(self, format_reader: RecordBatchReader, index_mapping:
List[int], partition_info: PartitionInfo,
- system_primary_key: Optional[List[str]], fields:
List[DataField]):
+ system_primary_key: Optional[List[str]], fields:
List[DataField],
+ max_sequence_number: int,
+ first_row_id: int,
+ row_tracking_enabled: bool,
+ system_fields: dict):
self.format_reader = format_reader
self.index_mapping = index_mapping
self.partition_info = partition_info
self.system_primary_key = system_primary_key
self.schema_map = {field.name: field for field in
PyarrowFieldParser.from_paimon_schema(fields)}
+ self.row_tracking_enabled = row_tracking_enabled
+ self.first_row_id = first_row_id
+ self.max_sequence_number = max_sequence_number
+ self.system_fields = system_fields
def read_arrow_batch(self) -> Optional[RecordBatch]:
record_batch = self.format_reader.read_arrow_batch()
@@ -45,6 +54,8 @@ class DataFileBatchReader(RecordBatchReader):
return None
if self.partition_info is None and self.index_mapping is None:
+ if self.row_tracking_enabled and self.system_fields:
+ record_batch = self._assign_row_tracking(record_batch)
return record_batch
inter_arrays = []
@@ -96,8 +107,31 @@ class DataFileBatchReader(RecordBatchReader):
target_field = pa.field(name, array.type)
final_fields.append(target_field)
final_schema = pa.schema(final_fields)
+ record_batch = pa.RecordBatch.from_arrays(inter_arrays,
schema=final_schema)
- return pa.RecordBatch.from_arrays(inter_arrays, schema=final_schema)
+ # Handle row tracking fields
+ if self.row_tracking_enabled and self.system_fields:
+ record_batch = self._assign_row_tracking(record_batch)
+
+ return record_batch
+
+ def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
+ """Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER)."""
+ arrays = list(record_batch.columns)
+
+ # Handle _ROW_ID field
+ if SpecialFields.ROW_ID.name in self.system_fields.keys():
+ idx = self.system_fields[SpecialFields.ROW_ID.name]
+ # Create a new array that fills with computed row IDs
+ arrays[idx] = pa.array(range(self.first_row_id, self.first_row_id
+ record_batch.num_rows), type=pa.int64())
+
+ # Handle _SEQUENCE_NUMBER field
+ if SpecialFields.SEQUENCE_NUMBER.name in self.system_fields.keys():
+ idx = self.system_fields[SpecialFields.SEQUENCE_NUMBER.name]
+ # Create a new array that fills with max_sequence_number
+ arrays[idx] = pa.repeat(self.max_sequence_number,
record_batch.num_rows)
+
+ return pa.RecordBatch.from_arrays(arrays,
names=record_batch.schema.names)
def close(self) -> None:
self.format_reader.close()
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index 4d2f1f4c0f..ed560d14a4 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -16,8 +16,9 @@
# limitations under the License.
################################################################################
-from typing import List, Optional, Any
+from typing import Any, List, Optional
+import pyarrow as pa
import pyarrow.dataset as ds
from pyarrow import RecordBatch
@@ -35,15 +36,45 @@ class FormatPyArrowReader(RecordBatchReader):
push_down_predicate: Any, batch_size: int = 4096):
file_path_for_pyarrow = file_io.to_filesystem_path(file_path)
self.dataset = ds.dataset(file_path_for_pyarrow, format=file_format,
filesystem=file_io.filesystem)
+ self.read_fields = read_fields
+
+ # Identify which fields exist in the file and which are missing
+ file_schema_names = set(self.dataset.schema.names)
+ self.existing_fields = [field for field in read_fields if field in
file_schema_names]
+ self.missing_fields = [field for field in read_fields if field not in
file_schema_names]
+
+ # Only pass existing fields to PyArrow scanner to avoid errors
self.reader = self.dataset.scanner(
- columns=read_fields,
+ columns=self.existing_fields,
filter=push_down_predicate,
batch_size=batch_size
).to_reader()
def read_arrow_batch(self) -> Optional[RecordBatch]:
try:
- return self.reader.read_next_batch()
+ batch = self.reader.read_next_batch()
+
+ if not self.missing_fields:
+ return batch
+
+ # Create columns for missing fields with null values
+ missing_columns = [pa.nulls(batch.num_rows, type=pa.null()) for _
in self.missing_fields]
+
+ # Reconstruct the batch with all fields in the correct order
+ all_columns = []
+ for field_name in self.read_fields:
+ if field_name in self.existing_fields:
+ # Get the column from the existing batch
+ column_idx = self.existing_fields.index(field_name)
+ all_columns.append(batch.column(column_idx))
+ else:
+ # Get the column from missing fields
+ column_idx = self.missing_fields.index(field_name)
+ all_columns.append(missing_columns[column_idx])
+
+ # Create a new RecordBatch with all columns
+ return pa.RecordBatch.from_arrays(all_columns,
names=self.read_fields)
+
except StopIteration:
return None
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index ed579a2a20..88dba9498f 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,7 +19,7 @@
import os
from abc import ABC, abstractmethod
from functools import partial
-from typing import List, Optional, Tuple, Callable
+from typing import Callable, List, Optional, Tuple
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
@@ -29,26 +29,31 @@ from pypaimon.manifest.schema.data_file_meta import
DataFileMeta
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.partition_info import PartitionInfo
from pypaimon.read.push_down_utils import trim_predicate_by_fields
-from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader,
ShardBatchReader, MergeAllBatchReader
+from pypaimon.read.reader.concat_batch_reader import (ConcatBatchReader,
+ MergeAllBatchReader,
+ ShardBatchReader)
from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
+from pypaimon.read.reader.data_evolution_merge_reader import \
+ DataEvolutionMergeReader
from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
-from pypaimon.read.reader.data_evolution_merge_reader import
DataEvolutionMergeReader
-from pypaimon.read.reader.field_bunch import FieldBunch, DataBunch, BlobBunch
from pypaimon.read.reader.drop_delete_reader import DropDeleteRecordReader
from pypaimon.read.reader.empty_record_reader import EmptyFileRecordReader
+from pypaimon.read.reader.field_bunch import BlobBunch, DataBunch, FieldBunch
from pypaimon.read.reader.filter_record_reader import FilterRecordReader
from pypaimon.read.reader.format_avro_reader import FormatAvroReader
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.format_lance_reader import FormatLanceReader
from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
-from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader,
RowPositionReader
+from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader,
+ RowPositionReader)
from pypaimon.read.reader.iface.record_reader import RecordReader
from pypaimon.read.reader.key_value_unwrap_reader import \
KeyValueUnwrapRecordReader
from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader
from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
from pypaimon.read.split import Split
-from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.special_fields import SpecialFields
KEY_PREFIX = "_KEY_"
KEY_FIELD_ID_START = 1000000
@@ -58,13 +63,20 @@ NULL_FIELD_INDEX = -1
class SplitRead(ABC):
"""Abstract base class for split reading operations."""
- def __init__(self, table, predicate: Optional[Predicate], read_type:
List[DataField], split: Split):
+ def __init__(
+ self,
+ table,
+ predicate: Optional[Predicate],
+ read_type: List[DataField],
+ split: Split,
+ row_tracking_enabled: bool):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.predicate = predicate
self.push_down_predicate = self._push_down_predicate()
self.split = split
+ self.row_tracking_enabled = row_tracking_enabled
self.value_arity = len(read_type)
self.trimmed_primary_key = self.table.trimmed_primary_keys
@@ -90,7 +102,7 @@ class SplitRead(ABC):
"""Create a record reader for the given split."""
def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
- read_fields: List[str]) -> RecordBatchReader:
+ read_fields: List[str], row_tracking_enabled:
bool) -> RecordBatchReader:
(read_file_fields, read_arrow_predicate) =
self._get_fields_and_predicate(file.schema_id, read_fields)
# Use external_path if available, otherwise use file_path
@@ -117,18 +129,43 @@ class SplitRead(ABC):
index_mapping = self.create_index_mapping()
partition_info = self._create_partition_info()
+ system_fields = SpecialFields.find_system_fields(self.read_fields)
+ table_schema_fields = (
+
SpecialFields.row_type_with_row_tracking(self.table.table_schema.fields)
+ if row_tracking_enabled else self.table.table_schema.fields
+ )
if for_merge_read:
- return DataFileBatchReader(format_reader, index_mapping,
partition_info, self.trimmed_primary_key,
- self.table.table_schema.fields)
+ return DataFileBatchReader(
+ format_reader,
+ index_mapping,
+ partition_info,
+ self.trimmed_primary_key,
+ table_schema_fields,
+ file.max_sequence_number,
+ file.first_row_id,
+ row_tracking_enabled,
+ system_fields)
else:
- return DataFileBatchReader(format_reader, index_mapping,
partition_info, None,
- self.table.table_schema.fields)
+ return DataFileBatchReader(
+ format_reader,
+ index_mapping,
+ partition_info,
+ None,
+ table_schema_fields,
+ file.max_sequence_number,
+ file.first_row_id,
+ row_tracking_enabled,
+ system_fields)
def _get_fields_and_predicate(self, schema_id: int, read_fields):
key = (schema_id, tuple(read_fields))
if key not in self.schema_id_2_fields:
schema = self.table.schema_manager.get_schema(schema_id)
- schema_field_names = set(field.name for field in schema.fields)
+ schema_fields = (
+ SpecialFields.row_type_with_row_tracking(schema.fields)
+ if self.row_tracking_enabled else schema.fields
+ )
+ schema_field_names = set(field.name for field in schema_fields)
if self.table.is_primary_key_table:
schema_field_names.add('_SEQUENCE_NUMBER')
schema_field_names.add('_VALUE_KIND')
@@ -161,10 +198,8 @@ class SplitRead(ABC):
key_field = DataField(key_field_id, key_field_name, field.type)
all_data_fields.append(key_field)
- sequence_field = DataField(2147483646, "_SEQUENCE_NUMBER",
AtomicType("BIGINT", nullable=False))
- all_data_fields.append(sequence_field)
- value_kind_field = DataField(2147483645, "_VALUE_KIND",
AtomicType("TINYINT", nullable=False))
- all_data_fields.append(value_kind_field)
+ all_data_fields.append(SpecialFields.SEQUENCE_NUMBER)
+ all_data_fields.append(SpecialFields.VALUE_KIND)
for field in value_field:
all_data_fields.append(field)
@@ -297,7 +332,8 @@ class SplitRead(ABC):
class RawFileSplitRead(SplitRead):
def raw_reader_supplier(self, file: DataFileMeta, dv_factory:
Optional[Callable] = None) -> RecordReader:
- file_batch_reader = self.file_reader_supplier(file, False,
self._get_final_read_data_fields())
+ file_batch_reader = self.file_reader_supplier(
+ file, False, self._get_final_read_data_fields(),
self.row_tracking_enabled)
dv = dv_factory() if dv_factory else None
if dv:
return
ApplyDeletionVectorReader(RowPositionReader(file_batch_reader), dv)
@@ -328,12 +364,14 @@ class RawFileSplitRead(SplitRead):
return concat_reader
def _get_all_data_fields(self):
+ if self.row_tracking_enabled:
+ return SpecialFields.row_type_with_row_tracking(self.table.fields)
return self.table.fields
class MergeFileSplitRead(SplitRead):
def kv_reader_supplier(self, file: DataFileMeta, dv_factory:
Optional[Callable] = None) -> RecordReader:
- file_batch_reader = self.file_reader_supplier(file, True,
self._get_final_read_data_fields())
+ file_batch_reader = self.file_reader_supplier(file, True,
self._get_final_read_data_fields(), False)
dv = dv_factory() if dv_factory else None
if dv:
return ApplyDeletionVectorReader(
@@ -517,7 +555,11 @@ class DataEvolutionSplitRead(SplitRead):
def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) ->
RecordReader:
"""Create a file reader for a single file."""
- return self.file_reader_supplier(file=file, for_merge_read=False,
read_fields=read_fields)
+ return self.file_reader_supplier(
+ file=file,
+ for_merge_read=False,
+ read_fields=read_fields,
+ row_tracking_enabled=True)
def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) ->
List[FieldBunch]:
"""Split files into field bunches."""
@@ -558,6 +600,8 @@ class DataEvolutionSplitRead(SplitRead):
for field in self.table.fields:
if field.name == field_name:
field_ids.append(field.id)
+ field_ids.append(SpecialFields.ROW_ID.id)
+ field_ids.append(SpecialFields.SEQUENCE_NUMBER.id)
return field_ids
@staticmethod
@@ -566,4 +610,4 @@ class DataEvolutionSplitRead(SplitRead):
return file_name.endswith('.blob')
def _get_all_data_fields(self):
- return self.table.fields
+ return SpecialFields.row_type_with_row_tracking(self.table.fields)
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index c98b932059..ee6981549f 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -24,8 +24,9 @@ from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.read.split import Split
-from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead,
- SplitRead, DataEvolutionSplitRead)
+from pypaimon.read.split_read import (DataEvolutionSplitRead,
+ MergeFileSplitRead, RawFileSplitRead,
+ SplitRead)
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
from pypaimon.table.row.offset_row import OffsetRow
@@ -159,21 +160,24 @@ class TableRead:
table=self.table,
predicate=self.predicate,
read_type=self.read_type,
- split=split
+ split=split,
+ row_tracking_enabled=False
)
elif self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED,
'false').lower() == 'true':
return DataEvolutionSplitRead(
table=self.table,
predicate=self.predicate,
read_type=self.read_type,
- split=split
+ split=split,
+ row_tracking_enabled=True
)
else:
return RawFileSplitRead(
table=self.table,
predicate=self.predicate,
read_type=self.read_type,
- split=split
+ split=split,
+
row_tracking_enabled=self.table.options.get(CoreOptions.ROW_TRACKING_ENABLED,
'false').lower() == 'true'
)
@staticmethod
diff --git a/paimon-python/pypaimon/table/special_fields.py
b/paimon-python/pypaimon/table/special_fields.py
new file mode 100644
index 0000000000..6d6a2e7fec
--- /dev/null
+++ b/paimon-python/pypaimon/table/special_fields.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+from ..schema.data_types import AtomicType, DataField
+
+
+class SpecialFields:
+ """
+ Special fields in a RowType with specific field ids.
+ """
+
+ SEQUENCE_NUMBER = DataField(2147483646, "_SEQUENCE_NUMBER",
AtomicType("BIGINT", nullable=False))
+ VALUE_KIND = DataField(2147483645, "_VALUE_KIND", AtomicType("TINYINT",
nullable=False))
+ ROW_ID = DataField(2147483642, "_ROW_ID", AtomicType("BIGINT",
nullable=False))
+
+ SYSTEM_FIELD_NAMES = {
+ '_SEQUENCE_NUMBER',
+ '_VALUE_KIND',
+ '_ROW_ID'
+ }
+
+ @staticmethod
+ def is_system_field(field_name: str) -> bool:
+ """Check if a field is a system field."""
+ return field_name in SpecialFields.SYSTEM_FIELD_NAMES
+
+ @staticmethod
+ def find_system_fields(read_fields: List[DataField]) -> dict:
+ """Find system fields in read fields and return a mapping of field
name to index."""
+ system_fields = {}
+ for i, field in enumerate(read_fields):
+ if SpecialFields.is_system_field(field.name):
+ system_fields[field.name] = i
+ return system_fields
+
+ @staticmethod
+ def row_type_with_row_tracking(table_fields: List[DataField],
+ sequence_number_nullable: bool = False) ->
List[DataField]:
+ """
+ Add row tracking fields.
+
+ Args:
+ table_fields: The original table fields
+ sequence_number_nullable: Whether sequence number should be
nullable
+ """
+ fields_with_row_tracking = list(table_fields)
+
+ for field in fields_with_row_tracking:
+ if (SpecialFields.ROW_ID.name == field.name
+ or SpecialFields.SEQUENCE_NUMBER.name == field.name):
+ raise ValueError(
+ f"Row tracking field name '{field.name}' conflicts with
existing field names."
+ )
+
+ fields_with_row_tracking.append(SpecialFields.ROW_ID)
+
+ if sequence_number_nullable:
+ seq_num_field = DataField(
+ id=SpecialFields.SEQUENCE_NUMBER.id,
+ name=SpecialFields.SEQUENCE_NUMBER.name,
+ type=AtomicType("BIGINT", nullable=True) # Make it nullable
+ )
+ fields_with_row_tracking.append(seq_num_field)
+ else:
+ fields_with_row_tracking.append(SpecialFields.SEQUENCE_NUMBER)
+
+ return fields_with_row_tracking
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 90abd2f916..896d534f09 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -20,7 +20,8 @@ import tempfile
import unittest
import pyarrow as pa
-from pypaimon import Schema, CatalogFactory
+
+from pypaimon import CatalogFactory, Schema
class DataEvolutionTest(unittest.TestCase):
@@ -481,3 +482,77 @@ class DataEvolutionTest(unittest.TestCase):
'f2': [f'c{i}' for i in range(size)],
}, schema=simple_pa_schema)
self.assertEqual(actual, expect)
+
+ def test_read_row_tracking_metadata(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int8()),
+ ('f1', pa.int16()),
+ ])
+ schema = Schema.from_pyarrow_schema(simple_pa_schema,
+ options={'row-tracking.enabled':
'true', 'data-evolution.enabled': 'true'})
+ self.catalog.create_table('default.test_row_tracking_meta', schema,
False)
+ table = self.catalog.get_table('default.test_row_tracking_meta')
+
+ # write 1
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ expect_data = pa.Table.from_pydict({
+ 'f0': [-1, 2],
+ 'f1': [-1001, 1002]
+ }, schema=simple_pa_schema)
+ table_write.write_arrow(expect_data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ read_builder.with_projection(['f0', '_ROW_ID', 'f1',
'_SEQUENCE_NUMBER'])
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_data = table_read.to_arrow(table_scan.plan().splits())
+ expect_data = pa.Table.from_pydict({
+ 'f0': [-1, 2],
+ '_ROW_ID': [0, 1],
+ 'f1': [-1001, 1002],
+ '_SEQUENCE_NUMBER': [1, 1],
+ }, schema=pa.schema([
+ ('f0', pa.int8()),
+ ('_ROW_ID', pa.int64()),
+ ('f1', pa.int16()),
+ ('_SEQUENCE_NUMBER', pa.int64()),
+ ]))
+ self.assertEqual(actual_data, expect_data)
+
+ # write 2
+ table_write = write_builder.new_write().with_write_type(['f0'])
+ table_commit = write_builder.new_commit()
+ data2 = pa.Table.from_pydict({
+ 'f0': [3, 4],
+ }, schema=pa.schema([
+ ('f0', pa.int8()),
+ ]))
+ table_write.write_arrow(data2)
+ cmts = table_write.prepare_commit()
+ cmts[0].new_files[0].first_row_id = 0
+ table_commit.commit(cmts)
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ read_builder.with_projection(['f0', 'f1', '_ROW_ID',
'_SEQUENCE_NUMBER'])
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_data = table_read.to_arrow(table_scan.plan().splits())
+ expect_data = pa.Table.from_pydict({
+ 'f0': [3, 4],
+ 'f1': [-1001, 1002],
+ '_ROW_ID': [0, 1],
+ '_SEQUENCE_NUMBER': [2, 2],
+ }, schema=pa.schema([
+ ('f0', pa.int8()),
+ ('f1', pa.int16()),
+ ('_ROW_ID', pa.int64()),
+ ('_SEQUENCE_NUMBER', pa.int64()),
+ ]))
+ self.assertEqual(actual_data, expect_data)