XiaoHongbo-Hope commented on code in PR #7815:
URL: https://github.com/apache/paimon/pull/7815#discussion_r3219748439
##########
paimon-python/pypaimon/write/table_update_by_row_id.py:
##########
@@ -40,53 +42,53 @@ class TableUpdateByRowId:
FIRST_ROW_ID_COLUMN = '_FIRST_ROW_ID'
- def __init__(self, table, commit_user: str):
+ def __init__(self, table, commit_user: str, commit_identifier: int):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.commit_user = commit_user
+ self.commit_identifier = commit_identifier
- # Load existing first_row_ids and build partition map
+ # Snapshot the current state once: a single ``first_row_id -> (split,
files)``
+ # map is enough to drive every downstream lookup (partition,
row-count, read).
(self.snapshot_id,
self.first_row_ids,
- self.first_row_id_to_partition_map,
- self.first_row_id_to_row_count_map,
- self.total_row_count,
- self.splits) = self._load_existing_files_info()
-
- # Collect commit messages
- self.commit_messages = []
-
- def _load_existing_files_info(self):
- """Load existing first_row_ids and build partition map for efficient
lookup."""
- first_row_ids = []
- first_row_id_to_partition_map: Dict[int, GenericRow] = {}
- first_row_id_to_row_count_map: Dict[int, int] = {}
-
- read_builder = self.table.new_read_builder()
- scan = read_builder.new_scan()
- plan = scan.plan()
+ self._first_row_id_index,
+ self.total_row_count) = self._load_existing_files_info()
+
+ self.commit_messages: List[CommitMessage] = []
+
+ def _load_existing_files_info(
+ self,
+ ) -> Tuple[int, List[int], Dict[int, Tuple[DataSplit,
List[DataFileMeta]]], int]:
+ """Scan the latest snapshot once and index files by ``first_row_id``.
+
+ Returns:
+ A 4-tuple of ``(snapshot_id, sorted_unique_first_row_ids, index,
total_row_count)``
+ where ``index`` maps each ``first_row_id`` to the owning split and
+ the list of files with that id (a single id may belong to multiple
+ files when data evolution has split a logical row range).
+ """
+ plan = self.table.new_read_builder().new_scan().plan()
splits = plan.splits()
+ index: Dict[int, Tuple[DataSplit, List[DataFileMeta]]] = {}
+ total_row_count = 0
for split in splits:
for file in split.files:
- if file.first_row_id is not None and not
file.file_name.endswith('.blob'):
- first_row_id = file.first_row_id
- first_row_ids.append(first_row_id)
- first_row_id_to_partition_map[first_row_id] =
split.partition
- first_row_id_to_row_count_map[first_row_id] =
file.row_count
-
- total_row_count = sum(first_row_id_to_row_count_map.values())
+ if file.first_row_id is None or
file.file_name.endswith('.blob'):
+ continue
+ entry = index.get(file.first_row_id)
+ if entry is None:
+ index[file.first_row_id] = (split, [file])
+ else:
+ entry[1].append(file)
+ total_row_count += file.row_count
Review Comment:
This over-counts rows after data evolution. Multiple physical files can
share the same first_row_id, so
summing every file makes total_row_count too large and lets invalid
_ROW_ID values pass
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]