This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2a7a0b830d [Python] Refactor BinaryRow to reuse keys and key fields
(#6445)
2a7a0b830d is described below
commit 2a7a0b830d6aaf6d35729bda18343b9879c8844d
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Oct 21 21:36:24 2025 +0800
[Python] Refactor BinaryRow to reuse keys and key fields (#6445)
---
paimon-python/pypaimon/common/predicate.py | 17 +++----------
.../pypaimon/manifest/manifest_file_manager.py | 16 ++++++-------
.../pypaimon/manifest/manifest_list_manager.py | 12 +++++-----
.../pypaimon/manifest/schema/simple_stats.py | 5 ++--
.../pypaimon/manifest/simple_stats_evolution.py | 1 -
.../pypaimon/manifest/simple_stats_evolutions.py | 1 -
.../pypaimon/read/scanner/full_starting_scanner.py | 2 +-
paimon-python/pypaimon/read/split_read.py | 2 +-
paimon-python/pypaimon/schema/table_schema.py | 28 ----------------------
paimon-python/pypaimon/table/file_store_table.py | 5 ++++
paimon-python/pypaimon/table/row/binary_row.py | 3 ---
paimon-python/pypaimon/table/row/generic_row.py | 3 ---
paimon-python/pypaimon/table/row/internal_row.py | 6 -----
paimon-python/pypaimon/table/row/offset_row.py | 3 ---
paimon-python/pypaimon/table/row/projected_row.py | 7 ------
paimon-python/pypaimon/tests/predicates_test.py | 12 +++++-----
paimon-python/pypaimon/write/file_store_commit.py | 10 ++++----
paimon-python/pypaimon/write/writer/data_writer.py | 16 ++++++-------
.../pypaimon/write/writer/key_value_data_writer.py | 8 +++----
19 files changed, 50 insertions(+), 107 deletions(-)
diff --git a/paimon-python/pypaimon/common/predicate.py
b/paimon-python/pypaimon/common/predicate.py
index 5e47fdd5df..9ae2cdfce3 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -27,7 +27,6 @@ from pyarrow import compute as pyarrow_compute
from pyarrow import dataset as pyarrow_dataset
from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.internal_row import InternalRow
@@ -74,25 +73,15 @@ class Predicate:
if self.method == 'or':
return any(p.test_by_simple_stats(stat, row_count) for p in
self.literals)
- # Get null count using the mapped index
- null_count = stat.null_counts[self.index] if stat.null_counts and
self.index < len(
- stat.null_counts) else 0
+ null_count = stat.null_counts[self.index]
if self.method == 'isNull':
return null_count is not None and null_count > 0
if self.method == 'isNotNull':
return null_count is None or row_count is None or null_count <
row_count
- if not isinstance(stat.min_values, GenericRow):
- # Parse field values using BinaryRow's direct field access by name
- min_value = stat.min_values.get_field(self.index)
- max_value = stat.max_values.get_field(self.index)
- else:
- # TODO transform partition to BinaryRow
- min_values = stat.min_values.to_dict()
- max_values = stat.max_values.to_dict()
- min_value = min_values[self.field]
- max_value = max_values[self.field]
+ min_value = stat.min_values.get_field(self.index)
+ max_value = stat.max_values.get_field(self.index)
if min_value is None or max_value is None or (null_count is not None
and null_count == row_count):
# invalid stats, skip validation
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index c196845ff4..9fc92a4113 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -38,9 +38,9 @@ class ManifestFileManager:
self.table: FileStoreTable = table
self.manifest_path = table.table_path / "manifest"
self.file_io = table.file_io
- self.partition_key_fields =
self.table.table_schema.get_partition_key_fields()
- self.primary_key_fields =
self.table.table_schema.get_primary_key_fields()
- self.trimmed_primary_key_fields =
self.table.table_schema.get_trimmed_primary_key_fields()
+ self.partition_keys_fields = self.table.partition_keys_fields
+ self.primary_keys_fields = self.table.primary_keys_fields
+ self.trimmed_primary_keys_fields =
self.table.trimmed_primary_keys_fields
def read(self, manifest_file_name: str, manifest_entry_filter=None,
drop_stats=True) -> List[ManifestEntry]:
manifest_file_path = self.manifest_path / manifest_file_name
@@ -55,8 +55,8 @@ class ManifestFileManager:
file_dict = dict(record['_FILE'])
key_dict = dict(file_dict['_KEY_STATS'])
key_stats = SimpleStats(
- min_values=BinaryRow(key_dict['_MIN_VALUES'],
self.trimmed_primary_key_fields),
- max_values=BinaryRow(key_dict['_MAX_VALUES'],
self.trimmed_primary_key_fields),
+ min_values=BinaryRow(key_dict['_MIN_VALUES'],
self.trimmed_primary_keys_fields),
+ max_values=BinaryRow(key_dict['_MAX_VALUES'],
self.trimmed_primary_keys_fields),
null_counts=key_dict['_NULL_COUNTS'],
)
@@ -80,8 +80,8 @@ class ManifestFileManager:
file_name=file_dict['_FILE_NAME'],
file_size=file_dict['_FILE_SIZE'],
row_count=file_dict['_ROW_COUNT'],
-
min_key=GenericRowDeserializer.from_bytes(file_dict['_MIN_KEY'],
self.trimmed_primary_key_fields),
-
max_key=GenericRowDeserializer.from_bytes(file_dict['_MAX_KEY'],
self.trimmed_primary_key_fields),
+
min_key=GenericRowDeserializer.from_bytes(file_dict['_MIN_KEY'],
self.trimmed_primary_keys_fields),
+
max_key=GenericRowDeserializer.from_bytes(file_dict['_MAX_KEY'],
self.trimmed_primary_keys_fields),
key_stats=key_stats,
value_stats=value_stats,
min_sequence_number=file_dict['_MIN_SEQUENCE_NUMBER'],
@@ -100,7 +100,7 @@ class ManifestFileManager:
)
entry = ManifestEntry(
kind=record['_KIND'],
-
partition=GenericRowDeserializer.from_bytes(record['_PARTITION'],
self.partition_key_fields),
+
partition=GenericRowDeserializer.from_bytes(record['_PARTITION'],
self.partition_keys_fields),
bucket=record['_BUCKET'],
total_buckets=record['_TOTAL_BUCKETS'],
file=file_meta
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index 0fc58652f6..367f802de5 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -25,8 +25,8 @@ from pypaimon.manifest.schema.manifest_file_meta import (
MANIFEST_FILE_META_SCHEMA, ManifestFileMeta)
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.snapshot.snapshot import Snapshot
-from pypaimon.table.row.generic_row import (GenericRowDeserializer,
- GenericRowSerializer)
+from pypaimon.table.row.binary_row import BinaryRow
+from pypaimon.table.row.generic_row import GenericRowSerializer
class ManifestListManager:
@@ -61,13 +61,13 @@ class ManifestListManager:
for record in reader:
stats_dict = dict(record['_PARTITION_STATS'])
partition_stats = SimpleStats(
- min_values=GenericRowDeserializer.from_bytes(
+ min_values=BinaryRow(
stats_dict['_MIN_VALUES'],
- self.table.table_schema.get_partition_key_fields()
+ self.table.partition_keys_fields
),
- max_values=GenericRowDeserializer.from_bytes(
+ max_values=BinaryRow(
stats_dict['_MAX_VALUES'],
- self.table.table_schema.get_partition_key_fields()
+ self.table.partition_keys_fields
),
null_counts=stats_dict['_NULL_COUNTS'],
)
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py
b/paimon-python/pypaimon/manifest/schema/simple_stats.py
index 1130a812fa..065728bc6a 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py
@@ -17,7 +17,7 @@
################################################################################
from dataclasses import dataclass
-from typing import List, Optional
+from typing import List
from typing import ClassVar
from pypaimon.table.row.generic_row import GenericRow
@@ -28,7 +28,8 @@ from pypaimon.table.row.internal_row import InternalRow
class SimpleStats:
min_values: InternalRow
max_values: InternalRow
- null_counts: Optional[List[int]]
+ # TODO convert null counts to InternalArray
+ null_counts: List[int]
_empty_stats: ClassVar[object] = None
diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolution.py
b/paimon-python/pypaimon/manifest/simple_stats_evolution.py
index 56cea98a85..d2291c54de 100644
--- a/paimon-python/pypaimon/manifest/simple_stats_evolution.py
+++ b/paimon-python/pypaimon/manifest/simple_stats_evolution.py
@@ -59,7 +59,6 @@ class SimpleStatsEvolution:
null_counts = self._project_array(null_counts, dense_index_mapping)
if self.index_mapping is not None:
- # TODO support schema evolution
min_values = self._project_row(min_values, self.index_mapping)
max_values = self._project_row(max_values, self.index_mapping)
diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
index 8331b7a5e5..373e333cd9 100644
--- a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
+++ b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
@@ -40,7 +40,6 @@ class SimpleStatsEvolutions:
if self.table_schema_id == data_schema_id:
evolution =
SimpleStatsEvolution(self.schema_fields(data_schema_id), None, None)
else:
- # TODO support schema evolution
if self.table_fields is None:
self.table_fields = self.table_data_fields
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 47a2d86d15..b94364db99 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -48,7 +48,7 @@ class FullStartingScanner(StartingScanner):
self.manifest_file_manager = ManifestFileManager(table)
self.primary_key_predicate = trim_and_transform_predicate(
- self.predicate, self.table.field_names,
self.table.table_schema.get_trimmed_primary_keys())
+ self.predicate, self.table.field_names,
self.table.trimmed_primary_keys)
self.partition_key_predicate = trim_and_transform_predicate(
self.predicate, self.table.field_names, self.table.partition_keys)
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 6b93b51d15..000e272e39 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -64,7 +64,7 @@ class SplitRead(ABC):
self.split = split
self.value_arity = len(read_type)
- self.trimmed_primary_key =
self.table.table_schema.get_trimmed_primary_keys()
+ self.trimmed_primary_key = self.table.trimmed_primary_keys
self.read_fields = read_type
if isinstance(self, MergeFileSplitRead):
self.read_fields = self._create_key_value_fields(read_type)
diff --git a/paimon-python/pypaimon/schema/table_schema.py
b/paimon-python/pypaimon/schema/table_schema.py
index d1875a7b0b..25dd19398b 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -145,31 +145,3 @@ class TableSchema:
comment=self.comment,
time_millis=self.time_millis
)
-
- def get_primary_key_fields(self) -> List[DataField]:
- if not self.primary_keys:
- return []
- field_map = {field.name: field for field in self.fields}
- return [field_map[name] for name in self.primary_keys if name in
field_map]
-
- def get_partition_key_fields(self) -> List[DataField]:
- if not self.partition_keys:
- return []
- field_map = {field.name: field for field in self.fields}
- return [field_map[name] for name in self.partition_keys if name in
field_map]
-
- def get_trimmed_primary_key_fields(self) -> List[DataField]:
- if not self.primary_keys or not self.partition_keys:
- return self.get_primary_key_fields()
- adjusted = [pk for pk in self.primary_keys if pk not in
self.partition_keys]
- # Validate that filtered list is not empty
- if not adjusted:
- raise ValueError(
- f"Primary key constraint {self.primary_keys} "
- f"should not be same with partition fields
{self.partition_keys}, "
- "this will result in only one record in a partition")
- field_map = {field.name: field for field in self.fields}
- return [field_map[name] for name in adjusted if name in field_map]
-
- def get_trimmed_primary_keys(self) -> List[str]:
- return [field.name for field in self.get_trimmed_primary_key_fields()]
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index f0186b1657..cde282eff0 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -49,7 +49,12 @@ class FileStoreTable(Table):
self.field_names = [field.name for field in table_schema.fields]
self.field_dict = {field.name: field for field in self.fields}
self.primary_keys = table_schema.primary_keys
+ self.primary_keys_fields = [self.field_dict[name] for name in
self.primary_keys]
self.partition_keys = table_schema.partition_keys
+ self.partition_keys_fields = [self.field_dict[name] for name in
self.partition_keys]
+ self.trimmed_primary_keys = [pk for pk in self.primary_keys if pk not
in self.partition_keys]
+ self.trimmed_primary_keys_fields = [self.field_dict[name] for name in
self.trimmed_primary_keys]
+
self.options = table_schema.options
self.cross_partition_update =
self.table_schema.cross_partition_update()
self.is_primary_key_table = bool(self.primary_keys)
diff --git a/paimon-python/pypaimon/table/row/binary_row.py
b/paimon-python/pypaimon/table/row/binary_row.py
index f908935d44..41773b57e3 100644
--- a/paimon-python/pypaimon/table/row/binary_row.py
+++ b/paimon-python/pypaimon/table/row/binary_row.py
@@ -54,8 +54,5 @@ class BinaryRow(InternalRow):
def get_row_kind(self) -> RowKind:
return self.row_kind
- def is_null_at(self, pos: int) -> bool:
- return self.get_field(pos) is None
-
def __len__(self):
return self.arity
diff --git a/paimon-python/pypaimon/table/row/generic_row.py
b/paimon-python/pypaimon/table/row/generic_row.py
index 13e3742110..b05e475951 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -45,9 +45,6 @@ class GenericRow(InternalRow):
raise IndexError(f"Position {pos} is out of bounds for row arity
{len(self.values)}")
return self.values[pos]
- def is_null_at(self, pos: int) -> bool:
- return self.get_field(pos) is None
-
def get_row_kind(self) -> RowKind:
return self.row_kind
diff --git a/paimon-python/pypaimon/table/row/internal_row.py
b/paimon-python/pypaimon/table/row/internal_row.py
index ca19ebcddf..e70468348c 100644
--- a/paimon-python/pypaimon/table/row/internal_row.py
+++ b/paimon-python/pypaimon/table/row/internal_row.py
@@ -33,12 +33,6 @@ class InternalRow(ABC):
Returns the value at the given position.
"""
- @abstractmethod
- def is_null_at(self, pos: int) -> bool:
- """
- Returns true if the element is null at the given position.
- """
-
@abstractmethod
def get_row_kind(self) -> RowKind:
"""
diff --git a/paimon-python/pypaimon/table/row/offset_row.py
b/paimon-python/pypaimon/table/row/offset_row.py
index 9a51a246c9..b6e6f8f432 100644
--- a/paimon-python/pypaimon/table/row/offset_row.py
+++ b/paimon-python/pypaimon/table/row/offset_row.py
@@ -47,9 +47,6 @@ class OffsetRow(InternalRow):
raise IndexError(f"Position {pos} is out of bounds for row arity
{self.arity}")
return self.row_tuple[self.offset + pos]
- def is_null_at(self, pos: int) -> bool:
- return self.get_field(pos) is None
-
def get_row_kind(self) -> RowKind:
return RowKind(self.row_kind_byte)
diff --git a/paimon-python/pypaimon/table/row/projected_row.py
b/paimon-python/pypaimon/table/row/projected_row.py
index 502338a605..d7a1cc6f40 100644
--- a/paimon-python/pypaimon/table/row/projected_row.py
+++ b/paimon-python/pypaimon/table/row/projected_row.py
@@ -50,13 +50,6 @@ class ProjectedRow(InternalRow):
return None
return self.row.get_field(self.index_mapping[pos])
- def is_null_at(self, pos: int) -> bool:
- """Returns true if the element is null at the given position."""
- if self.index_mapping[pos] < 0:
- # TODO move this logical to hive
- return True
- return self.row.is_null_at(self.index_mapping[pos])
-
def get_row_kind(self) -> RowKind:
"""Returns the kind of change that this row describes in a
changelog."""
return self.row.get_row_kind()
diff --git a/paimon-python/pypaimon/tests/predicates_test.py
b/paimon-python/pypaimon/tests/predicates_test.py
index 561641589f..6e6de2fcae 100644
--- a/paimon-python/pypaimon/tests/predicates_test.py
+++ b/paimon-python/pypaimon/tests/predicates_test.py
@@ -456,25 +456,25 @@ class PredicateTest(unittest.TestCase):
count += 1
self.assertEqual(len(split.files), 1)
min_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
-
table.table_schema.get_primary_key_fields()).to_dict()
+
table.primary_keys_fields).to_dict()
max_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
-
table.table_schema.get_primary_key_fields()).to_dict()
+
table.primary_keys_fields).to_dict()
self.assertTrue(min_values["key1"] == 1 and min_values["key2"]
== "e"
and max_values["key1"] == 4 and
max_values["key2"] == "h")
elif split.partition.values == ["p2", 2]:
count += 1
min_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
-
table.table_schema.get_primary_key_fields()).to_dict()
+
table.primary_keys_fields).to_dict()
max_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
-
table.table_schema.get_primary_key_fields()).to_dict()
+
table.primary_keys_fields).to_dict()
self.assertTrue(min_values["key1"] == 5 and min_values["key2"]
== "a"
and max_values["key1"] == 8 and
max_values["key2"] == "d")
elif split.partition.values == ["p1", 1]:
count += 1
min_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
-
table.table_schema.get_primary_key_fields()).to_dict()
+
table.primary_keys_fields).to_dict()
max_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
-
table.table_schema.get_primary_key_fields()).to_dict()
+
table.primary_keys_fields).to_dict()
self.assertTrue(min_values["key1"] == max_values["key1"] == 7
and max_values["key2"] == max_values["key2"]
== "b")
self.assertEqual(count, 3)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index c2fe33105b..afeba52b3c 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -67,7 +67,7 @@ class FileStoreCommit:
commit_entries = []
for msg in commit_messages:
- partition = GenericRow(list(msg.partition),
self.table.table_schema.get_partition_key_fields())
+ partition = GenericRow(list(msg.partition),
self.table.partition_keys_fields)
for file in msg.new_files:
commit_entries.append(ManifestEntry(
kind=0,
@@ -89,7 +89,7 @@ class FileStoreCommit:
partition_filter = None
# sanity check, all changes must be done within the given partition,
meanwhile build a partition filter
if len(overwrite_partition) > 0:
- predicate_builder =
PredicateBuilder(self.table.table_schema.get_partition_key_fields())
+ predicate_builder =
PredicateBuilder(self.table.partition_keys_fields)
sub_predicates = []
for key, value in overwrite_partition.items():
sub_predicates.append(predicate_builder.equal(key, value))
@@ -107,7 +107,7 @@ class FileStoreCommit:
entry.kind = 1
commit_entries.append(entry)
for msg in commit_messages:
- partition = GenericRow(list(msg.partition),
self.table.table_schema.get_partition_key_fields())
+ partition = GenericRow(list(msg.partition),
self.table.partition_keys_fields)
for file in msg.new_files:
commit_entries.append(ManifestEntry(
kind=0,
@@ -174,11 +174,11 @@ class FileStoreCommit:
partition_stats=SimpleStats(
min_values=GenericRow(
values=partition_min_stats,
- fields=self.table.table_schema.get_partition_key_fields(),
+ fields=self.table.partition_keys_fields
),
max_values=GenericRow(
values=partition_max_stats,
- fields=self.table.table_schema.get_partition_key_fields(),
+ fields=self.table.partition_keys_fields
),
null_counts=partition_null_counts,
),
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 3515991933..24e3b0ca48 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -43,8 +43,8 @@ class DataWriter(ABC):
self.bucket = bucket
self.file_io = self.table.file_io
- self.trimmed_primary_key_fields =
self.table.table_schema.get_trimmed_primary_key_fields()
- self.trimmed_primary_key =
self.table.table_schema.get_trimmed_primary_keys()
+ self.trimmed_primary_keys_fields =
self.table.trimmed_primary_keys_fields
+ self.trimmed_primary_keys = self.table.trimmed_primary_keys
options = self.table.options
self.target_file_size = 256 * 1024 * 1024
@@ -159,7 +159,7 @@ class DataWriter(ABC):
# min key & max key
- selected_table = data.select(self.trimmed_primary_key)
+ selected_table = data.select(self.trimmed_primary_keys)
key_columns_batch = selected_table.to_batches()[0]
min_key_row_batch = key_columns_batch.slice(0, 1)
max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows
- 1, 1)
@@ -177,7 +177,7 @@ class DataWriter(ABC):
min_value_stats = [column_stats[field.name]['min_values'] for field in
all_fields]
max_value_stats = [column_stats[field.name]['max_values'] for field in
all_fields]
value_null_counts = [column_stats[field.name]['null_counts'] for field
in all_fields]
- key_fields = self.trimmed_primary_key_fields
+ key_fields = self.trimmed_primary_keys_fields
min_key_stats = [column_stats[field.name]['min_values'] for field in
key_fields]
max_key_stats = [column_stats[field.name]['max_values'] for field in
key_fields]
key_null_counts = [column_stats[field.name]['null_counts'] for field
in key_fields]
@@ -191,11 +191,11 @@ class DataWriter(ABC):
file_name=file_name,
file_size=self.file_io.get_file_size(file_path),
row_count=data.num_rows,
- min_key=GenericRow(min_key, self.trimmed_primary_key_fields),
- max_key=GenericRow(max_key, self.trimmed_primary_key_fields),
+ min_key=GenericRow(min_key, self.trimmed_primary_keys_fields),
+ max_key=GenericRow(max_key, self.trimmed_primary_keys_fields),
key_stats=SimpleStats(
- GenericRow(min_key_stats, self.trimmed_primary_key_fields),
- GenericRow(max_key_stats, self.trimmed_primary_key_fields),
+ GenericRow(min_key_stats, self.trimmed_primary_keys_fields),
+ GenericRow(max_key_stats, self.trimmed_primary_keys_fields),
key_null_counts,
),
value_stats=SimpleStats(
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index fb929710e8..05cad9bca9 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -38,23 +38,23 @@ class KeyValueDataWriter(DataWriter):
num_rows = data.num_rows
enhanced_table = data
- for pk_key in reversed(self.trimmed_primary_key):
+ for pk_key in reversed(self.trimmed_primary_keys):
if pk_key in data.column_names:
key_column = data.column(pk_key)
enhanced_table = enhanced_table.add_column(0,
f'_KEY_{pk_key}', key_column)
sequence_column = pa.array([self.sequence_generator.next() for _ in
range(num_rows)], type=pa.int64())
- enhanced_table =
enhanced_table.add_column(len(self.trimmed_primary_key), '_SEQUENCE_NUMBER',
sequence_column)
+ enhanced_table =
enhanced_table.add_column(len(self.trimmed_primary_keys), '_SEQUENCE_NUMBER',
sequence_column)
# TODO: support real row kind here
value_kind_column = pa.array([0] * num_rows, type=pa.int32())
- enhanced_table =
enhanced_table.add_column(len(self.trimmed_primary_key) + 1, '_VALUE_KIND',
+ enhanced_table =
enhanced_table.add_column(len(self.trimmed_primary_keys) + 1, '_VALUE_KIND',
value_kind_column)
return enhanced_table
def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch:
- sort_keys = [(key, 'ascending') for key in self.trimmed_primary_key]
+ sort_keys = [(key, 'ascending') for key in self.trimmed_primary_keys]
if '_SEQUENCE_NUMBER' in data.column_names:
sort_keys.append(('_SEQUENCE_NUMBER', 'ascending'))