leaves12138 commented on code in PR #6977:
URL: https://github.com/apache/paimon/pull/6977#discussion_r2671716867
##########
paimon-python/pypaimon/write/file_store_commit.py:
##########
@@ -138,76 +113,49 @@ def overwrite(self, overwrite_partition, commit_messages:
List[CommitMessage], c
self._try_commit(
commit_kind="OVERWRITE",
- commit_entries=None, # Will be generated in _try_commit based on
latest snapshot
- commit_identifier=commit_identifier
+ commit_identifier=commit_identifier,
+ commit_entries_plan=lambda snapshot:
self._generate_overwrite_entries(snapshot)
)
- def _try_commit(self, commit_kind, commit_entries, commit_identifier):
+ def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan):
import threading
retry_count = 0
- retry_result = None
start_time_ms = int(time.time() * 1000)
thread_id = threading.current_thread().name
while True:
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
-
- if commit_kind == "OVERWRITE":
- commit_entries = self._generate_overwrite_entries()
+ commit_entries = commit_entries_plan(latest_snapshot)
result = self._try_commit_once(
- retry_result=retry_result,
commit_kind=commit_kind,
commit_entries=commit_entries,
commit_identifier=commit_identifier,
latest_snapshot=latest_snapshot
)
- if result.is_success():
- logger.warning(
+ if result:
+ logger.info(
f"Thread {thread_id}: commit success {latest_snapshot.id +
1 if latest_snapshot else 1} "
f"after {retry_count} retries"
)
break
- retry_result = result
-
elapsed_ms = int(time.time() * 1000) - start_time_ms
if elapsed_ms > self.commit_timeout or retry_count >=
self.commit_max_retries:
error_msg = (
f"Commit failed {latest_snapshot.id + 1 if latest_snapshot
else 1} "
f"after {elapsed_ms} millis with {retry_count} retries, "
f"there maybe exist commit conflicts between multiple
jobs."
)
- if retry_result.exception:
- raise RuntimeError(error_msg) from retry_result.exception
- else:
- raise RuntimeError(error_msg)
+ raise RuntimeError(error_msg)
self._commit_retry_wait(retry_count)
retry_count += 1
- def _try_commit_once(self, retry_result: Optional[RetryResult],
commit_kind: str,
+ def _try_commit_once(self, commit_kind: str,
commit_entries: List[ManifestEntry],
commit_identifier: int,
- latest_snapshot: Optional[Snapshot]) -> CommitResult:
- start_time_ms = int(time.time() * 1000)
-
- if retry_result is not None and latest_snapshot is not None:
Review Comment:
Sure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]