JingsongLi commented on code in PR #6977:
URL: https://github.com/apache/paimon/pull/6977#discussion_r2671386868


##########
paimon-python/pypaimon/write/file_store_commit.py:
##########
@@ -138,76 +113,49 @@ def overwrite(self, overwrite_partition, commit_messages: 
List[CommitMessage], c
 
         self._try_commit(
             commit_kind="OVERWRITE",
-            commit_entries=None,  # Will be generated in _try_commit based on 
latest snapshot
-            commit_identifier=commit_identifier
+            commit_identifier=commit_identifier,
+            commit_entries_plan=lambda snapshot: 
self._generate_overwrite_entries(snapshot)
         )
 
-    def _try_commit(self, commit_kind, commit_entries, commit_identifier):
+    def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan):
         import threading
 
         retry_count = 0
-        retry_result = None
         start_time_ms = int(time.time() * 1000)
         thread_id = threading.current_thread().name
         while True:
             latest_snapshot = self.snapshot_manager.get_latest_snapshot()
-
-            if commit_kind == "OVERWRITE":
-                commit_entries = self._generate_overwrite_entries()
+            commit_entries = commit_entries_plan(latest_snapshot)
 
             result = self._try_commit_once(
-                retry_result=retry_result,
                 commit_kind=commit_kind,
                 commit_entries=commit_entries,
                 commit_identifier=commit_identifier,
                 latest_snapshot=latest_snapshot
             )
 
-            if result.is_success():
-                logger.warning(
+            if result:
+                logger.info(
                     f"Thread {thread_id}: commit success {latest_snapshot.id + 
1 if latest_snapshot else 1} "
                     f"after {retry_count} retries"
                 )
                 break
 
-            retry_result = result
-
             elapsed_ms = int(time.time() * 1000) - start_time_ms
             if elapsed_ms > self.commit_timeout or retry_count >= 
self.commit_max_retries:
                 error_msg = (
                     f"Commit failed {latest_snapshot.id + 1 if latest_snapshot 
else 1} "
                     f"after {elapsed_ms} millis with {retry_count} retries, "
                     f"there maybe exist commit conflicts between multiple 
jobs."
                 )
-                if retry_result.exception:
-                    raise RuntimeError(error_msg) from retry_result.exception
-                else:
-                    raise RuntimeError(error_msg)
+                raise RuntimeError(error_msg)
 
             self._commit_retry_wait(retry_count)
             retry_count += 1
 
-    def _try_commit_once(self, retry_result: Optional[RetryResult], 
commit_kind: str,
+    def _try_commit_once(self, commit_kind: str,
                          commit_entries: List[ManifestEntry], 
commit_identifier: int,
-                         latest_snapshot: Optional[Snapshot]) -> CommitResult:
-        start_time_ms = int(time.time() * 1000)
-
-        if retry_result is not None and latest_snapshot is not None:

Review Comment:
   Keep this as it is, and add test to verify this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to