SteNicholas commented on code in PR #7815:
URL: https://github.com/apache/paimon/pull/7815#discussion_r3223348405
##########
paimon-python/pypaimon/write/table_update_by_row_id.py:
##########
@@ -40,53 +42,53 @@ class TableUpdateByRowId:
FIRST_ROW_ID_COLUMN = '_FIRST_ROW_ID'
- def __init__(self, table, commit_user: str):
+ def __init__(self, table, commit_user: str, commit_identifier: int):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.commit_user = commit_user
+ self.commit_identifier = commit_identifier
- # Load existing first_row_ids and build partition map
+ # Snapshot the current state once: a single ``first_row_id -> (split,
files)``
+ # map is enough to drive every downstream lookup (partition,
row-count, read).
(self.snapshot_id,
self.first_row_ids,
- self.first_row_id_to_partition_map,
- self.first_row_id_to_row_count_map,
- self.total_row_count,
- self.splits) = self._load_existing_files_info()
-
- # Collect commit messages
- self.commit_messages = []
-
- def _load_existing_files_info(self):
- """Load existing first_row_ids and build partition map for efficient
lookup."""
- first_row_ids = []
- first_row_id_to_partition_map: Dict[int, GenericRow] = {}
- first_row_id_to_row_count_map: Dict[int, int] = {}
-
- read_builder = self.table.new_read_builder()
- scan = read_builder.new_scan()
- plan = scan.plan()
+ self._first_row_id_index,
+ self.total_row_count) = self._load_existing_files_info()
+
+ self.commit_messages: List[CommitMessage] = []
+
+ def _load_existing_files_info(
+ self,
+ ) -> Tuple[int, List[int], Dict[int, Tuple[DataSplit,
List[DataFileMeta]]], int]:
+ """Scan the latest snapshot once and index files by ``first_row_id``.
+
+ Returns:
+ A 4-tuple of ``(snapshot_id, sorted_unique_first_row_ids, index,
total_row_count)``
+ where ``index`` maps each ``first_row_id`` to the owning split and
+ the list of files with that id (a single id may belong to multiple
+ files when data evolution has split a logical row range).
+ """
+ plan = self.table.new_read_builder().new_scan().plan()
splits = plan.splits()
+ index: Dict[int, Tuple[DataSplit, List[DataFileMeta]]] = {}
+ total_row_count = 0
for split in splits:
for file in split.files:
- if file.first_row_id is not None and not
file.file_name.endswith('.blob'):
- first_row_id = file.first_row_id
- first_row_ids.append(first_row_id)
- first_row_id_to_partition_map[first_row_id] =
split.partition
- first_row_id_to_row_count_map[first_row_id] =
file.row_count
-
- total_row_count = sum(first_row_id_to_row_count_map.values())
+ if file.first_row_id is None or
file.file_name.endswith('.blob'):
+ continue
+ entry = index.get(file.first_row_id)
+ if entry is None:
+ index[file.first_row_id] = (split, [file])
+ else:
+ entry[1].append(file)
+ total_row_count += file.row_count
Review Comment:
@XiaoHongbo-Hope, yes, it's right. I have aligned with the Java
implementation and fixed this summary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]