This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1255e1494d [python] Add more log for better trouble shooting (#7233)
1255e1494d is described below
commit 1255e1494de5adcd5d2050f87b2bcb0775281945
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Feb 9 10:54:36 2026 +0800
[python] Add more log for better trouble shooting (#7233)
---
.../pypaimon/read/scanner/file_scanner.py | 10 ++++
.../pypaimon/snapshot/catalog_snapshot_commit.py | 8 ++-
.../pypaimon/snapshot/renaming_snapshot_commit.py | 9 +--
.../pypaimon/snapshot/snapshot_manager.py | 14 ++++-
paimon-python/pypaimon/write/file_store_commit.py | 69 +++++++++++++++++++---
paimon-python/pypaimon/write/table_commit.py | 7 +++
6 files changed, 101 insertions(+), 16 deletions(-)
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 0f53ef0cd3..b5d13e561d 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -16,8 +16,12 @@ See the License for the specific language governing
permissions and
limitations under the License.
"""
import os
+import time
+import logging
from typing import List, Optional, Dict, Set, Callable
+logger = logging.getLogger(__name__)
+
from pypaimon.common.predicate import Predicate
from pypaimon.globalindex import ScoredGlobalIndexResult
from pypaimon.table.source.deletion_file import DeletionFile
@@ -143,6 +147,7 @@ class FileScanner:
return
self._scan_dv_index(self.snapshot_manager.get_latest_snapshot(), bucket_files)
def scan(self) -> Plan:
+ start_ms = time.time() * 1000
# Create appropriate split generator based on table type
if self.table.is_primary_key_table:
entries = self.plan_files()
@@ -176,6 +181,11 @@ class FileScanner:
splits = split_generator.create_splits(entries)
splits = self._apply_push_down_limit(splits)
+ duration_ms = int(time.time() * 1000 - start_ms)
+ logger.info(
+ "File store scan plan completed in %d ms. Files size: %d",
+ duration_ms, len(entries)
+ )
return Plan(splits)
def _create_data_evolution_split_generator(self):
diff --git a/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py
b/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py
index 1ffc80af57..77e5492798 100755
--- a/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py
+++ b/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py
@@ -16,9 +16,12 @@
# limitations under the License.
################################################################################
+import logging
from typing import List
from pypaimon.catalog.catalog import Catalog
+
+logger = logging.getLogger(__name__)
from pypaimon.common.identifier import Identifier
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
@@ -64,7 +67,10 @@ class CatalogSnapshotCommit(SnapshotCommit):
# Call catalog's commit_snapshot method
if hasattr(self.catalog, 'commit_snapshot'):
- return self.catalog.commit_snapshot(new_identifier, self.uuid,
snapshot, statistics)
+ success = self.catalog.commit_snapshot(new_identifier, self.uuid,
snapshot, statistics)
+ if success:
+ logger.info("Catalog snapshot commit succeeded for %s,
snapshot id %d", new_identifier, snapshot.id)
+ return success
else:
# Fallback for catalogs that don't support snapshot commits
raise NotImplementedError(
diff --git a/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py
b/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py
index 27c5a54a2c..acc1650baf 100644
--- a/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py
+++ b/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py
@@ -16,9 +16,12 @@
# limitations under the License.
################################################################################
+import logging
from typing import List
from pypaimon.common.file_io import FileIO
+
+logger = logging.getLogger(__name__)
from pypaimon.common.json_util import JSON
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
@@ -62,12 +65,12 @@ class RenamingSnapshotCommit(SnapshotCommit):
"""
new_snapshot_path =
self.snapshot_manager.get_snapshot_path(snapshot.id)
if not self.file_io.exists(new_snapshot_path):
- """Internal function to perform the actual commit."""
# Try to write atomically using the file IO
committed = self.file_io.try_to_write_atomic(new_snapshot_path,
JSON.to_json(snapshot, indent=2))
if committed:
# Update the latest hint
self._commit_latest_hint(snapshot.id)
+ logger.info("Renaming snapshot commit succeeded, snapshot id
%d", snapshot.id)
return committed
return False
@@ -89,6 +92,4 @@ class RenamingSnapshotCommit(SnapshotCommit):
# Fallback to regular write
self.file_io.write_file(latest_file, str(snapshot_id),
overwrite=True)
except Exception as e:
- # Log the error but don't fail the commit for this
- # In a production system, you might want to use proper logging
- print(f"Warning: Failed to update LATEST hint: {e}")
+ logger.warning("Failed to update LATEST hint: %s", e)
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index cc044fbc1a..e6b2ca5c64 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -15,9 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+import logging
from typing import Optional
from pypaimon.common.file_io import FileIO
+
+logger = logging.getLogger(__name__)
from pypaimon.common.json_util import JSON
from pypaimon.snapshot.snapshot import Snapshot
@@ -115,9 +118,14 @@ class SnapshotManager:
if self.file_io.exists(earliest_file):
earliest_content = self.file_io.read_file_utf8(earliest_file)
earliest_snapshot_id = int(earliest_content.strip())
- return self.get_snapshot_by_id(earliest_snapshot_id)
- else:
- return self.get_snapshot_by_id(1)
+ snapshot = self.get_snapshot_by_id(earliest_snapshot_id)
+ if snapshot is None:
+ logger.warning(
+ "The earliest snapshot or changelog was once identified
but disappeared. "
+ "It might have been expired by other jobs operating on
this table."
+ )
+ return snapshot
+ return self.get_snapshot_by_id(1)
def earlier_or_equal_time_mills(self, timestamp: int) ->
Optional[Snapshot]:
"""
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 0b5af2c9c3..881c9d52c3 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -97,6 +97,11 @@ class FileStoreCommit:
if not commit_messages:
return
+ logger.info(
+ "Ready to commit to table %s, number of commit messages: %d",
+ self.table.identifier,
+ len(commit_messages),
+ )
commit_entries = []
for msg in commit_messages:
partition = GenericRow(list(msg.partition),
self.table.partition_keys_fields)
@@ -109,6 +114,7 @@ class FileStoreCommit:
file=file
))
+ logger.info("Finished collecting changes, including: %d entries",
len(commit_entries))
self._try_commit(commit_kind="APPEND",
commit_identifier=commit_identifier,
commit_entries_plan=lambda snapshot: commit_entries)
@@ -118,6 +124,11 @@ class FileStoreCommit:
if not commit_messages:
return
+ logger.info(
+ "Ready to overwrite to table %s, number of commit messages: %d",
+ self.table.identifier,
+ len(commit_messages),
+ )
partition_filter = None
# sanity check, all changes must be done within the given partition,
meanwhile build a partition filter
if len(overwrite_partition) > 0:
@@ -203,16 +214,43 @@ class FileStoreCommit:
)
if result.is_success():
+ commit_duration_ms = int(time.time() * 1000) - start_time_ms
logger.info(
- f"Thread {thread_id}: commit success {latest_snapshot.id +
1 if latest_snapshot else 1} "
- f"after {retry_count} retries"
+ "Thread %s: commit success %d after %d retries",
+ thread_id,
+ latest_snapshot.id + 1 if latest_snapshot else 1,
+ retry_count,
)
+ if commit_kind == "OVERWRITE":
+ logger.info(
+ "Finished overwrite to table %s, duration %d ms",
+ self.table.identifier,
+ commit_duration_ms,
+ )
+ else:
+ logger.info(
+ "Finished commit to table %s, duration %d ms",
+ self.table.identifier,
+ commit_duration_ms,
+ )
break
retry_result = result
elapsed_ms = int(time.time() * 1000) - start_time_ms
if elapsed_ms > self.commit_timeout or retry_count >=
self.commit_max_retries:
+ if commit_kind == "OVERWRITE":
+ logger.info(
+ "Finished (Uncertain of success) overwrite to table
%s, duration %d ms",
+ self.table.identifier,
+ elapsed_ms,
+ )
+ else:
+ logger.info(
+ "Finished (Uncertain of success) commit to table %s,
duration %d ms",
+ self.table.identifier,
+ elapsed_ms,
+ )
error_msg = (
f"Commit failed {latest_snapshot.id + 1 if latest_snapshot
else 1} "
f"after {elapsed_ms} millis with {retry_count} retries, "
@@ -229,6 +267,7 @@ class FileStoreCommit:
def _try_commit_once(self, retry_result: Optional[RetryResult],
commit_kind: str,
commit_entries: List[ManifestEntry],
commit_identifier: int,
latest_snapshot: Optional[Snapshot]) -> CommitResult:
+ start_millis = int(time.time() * 1000)
if self._is_duplicate_commit(retry_result, latest_snapshot,
commit_identifier, commit_kind):
return SuccessResult()
@@ -306,18 +345,32 @@ class FileStoreCommit:
with self.snapshot_commit:
success = self.snapshot_commit.commit(snapshot_data,
self.table.current_branch(), statistics)
if not success:
- logger.warning(f"Atomic commit failed for snapshot
#{new_snapshot_id} failed")
+ commit_time_s = (int(time.time() * 1000) - start_millis) /
1000
+ logger.warning(
+ "Atomic commit failed for snapshot #%d by user %s "
+ "with identifier %s and kind %s after %.0f seconds. "
+ "Clean up and try again.",
+ new_snapshot_id,
+ self.commit_user,
+ commit_identifier,
+ commit_kind,
+ commit_time_s,
+ )
self._cleanup_preparation_failure(delta_manifest_list,
base_manifest_list)
return RetryResult(latest_snapshot, None)
except Exception as e:
# Commit exception, not sure about the situation and should not
clean up the files
- logger.warning("Retry commit for exception")
+ logger.warning("Retry commit for exception.", exc_info=True)
return RetryResult(latest_snapshot, e)
- logger.warning(
- f"Successfully commit snapshot {new_snapshot_id} to table
{self.table.identifier} "
- f"for snapshot-{new_snapshot_id} by user {self.commit_user} "
- + f"with identifier {commit_identifier} and kind {commit_kind}."
+ logger.info(
+ "Successfully commit snapshot %d to table %s by user %s "
+ "with identifier %s and kind %s.",
+ new_snapshot_id,
+ self.table.identifier,
+ self.commit_user,
+ commit_identifier,
+ commit_kind,
)
return SuccessResult()
diff --git a/paimon-python/pypaimon/write/table_commit.py
b/paimon-python/pypaimon/write/table_commit.py
index e2b75881ea..1eafafefc0 100644
--- a/paimon-python/pypaimon/write/table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -16,9 +16,12 @@
# limitations under the License.
################################################################################
+import logging
from typing import Dict, List, Optional
from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+
+logger = logging.getLogger(__name__)
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_commit import FileStoreCommit
@@ -48,6 +51,10 @@ class TableCommit:
if not non_empty_messages:
return
+ logger.info(
+ "Committing batch table %s, %d non-empty messages",
+ self.table.identifier, len(non_empty_messages)
+ )
if self.overwrite_partition is not None:
self.file_store_commit.overwrite(
overwrite_partition=self.overwrite_partition,