This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 470b381171 [python] Remove all redundant changes in pr 6969 (#6977)
470b381171 is described below
commit 470b38117198e941083fafd9b7e439ffe4bf093e
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jan 8 23:26:30 2026 +0800
[python] Remove all redundant changes in pr 6969 (#6977)
---
paimon-python/pypaimon/common/file_io.py | 19 +----
paimon-python/pypaimon/filesystem/local.py | 34 ++++++++
.../pypaimon/tests/reader_append_only_test.py | 60 +++++++++++++
paimon-python/pypaimon/write/file_store_commit.py | 98 +++++++++-------------
4 files changed, 138 insertions(+), 73 deletions(-)
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 2ec1909306..556d9e9ae7 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -18,7 +18,6 @@
import logging
import os
import subprocess
-import threading
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
@@ -26,11 +25,12 @@ from urllib.parse import splitport, urlparse
import pyarrow
from packaging.version import parse
-from pyarrow._fs import FileSystem, LocalFileSystem
+from pyarrow._fs import FileSystem
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions, S3Options
from pypaimon.common.uri_reader import UriReaderFactory
+from pypaimon.filesystem.local import PaimonLocalFileSystem
from pypaimon.schema.data_types import DataField, AtomicType,
PyarrowFieldParser
from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
from pypaimon.table.row.generic_row import GenericRow
@@ -39,8 +39,6 @@ from pypaimon.write.blob_format_writer import BlobFormatWriter
class FileIO:
- rename_lock = threading.Lock()
-
def __init__(self, path: str, catalog_options: Options):
self.properties = catalog_options
self.logger = logging.getLogger(__name__)
@@ -183,9 +181,8 @@ class FileIO:
)
def _initialize_local_fs(self) -> FileSystem:
- from pyarrow.fs import LocalFileSystem
- return LocalFileSystem()
+ return PaimonLocalFileSystem()
def new_input_stream(self, path: str):
path_str = self.to_filesystem_path(path)
@@ -255,15 +252,7 @@ class FileIO:
self.mkdirs(str(dst_parent))
src_str = self.to_filesystem_path(src)
- if isinstance(self.filesystem, LocalFileSystem):
- if self.exists(dst):
- return False
- with FileIO.rename_lock:
- if self.exists(dst):
- return False
- self.filesystem.move(src_str, dst_str)
- else:
- self.filesystem.move(src_str, dst_str)
+ self.filesystem.move(src_str, dst_str)
return True
except Exception as e:
self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
diff --git a/paimon-python/pypaimon/filesystem/local.py
b/paimon-python/pypaimon/filesystem/local.py
new file mode 100644
index 0000000000..c845f8547c
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/local.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import threading
+import pyarrow
+from pyarrow._fs import LocalFileSystem
+
+
+class PaimonLocalFileSystem(LocalFileSystem):
+
+ rename_lock = threading.Lock()
+
+ def move(self, src, dst):
+ with PaimonLocalFileSystem.rename_lock:
+ file_info = self.get_file_info([dst])[0]
+ result = file_info.type != pyarrow.fs.FileType.NotFound
+ if (result is True):
+ raise Exception("Target file already exists")
+
+ super(PaimonLocalFileSystem, self).move(src, dst)
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index b47f5d1f67..d65658ef5c 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -28,7 +28,11 @@ import pyarrow as pa
from pypaimon import CatalogFactory, Schema
from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.write.file_store_commit import RetryResult
class AoReaderTest(unittest.TestCase):
@@ -153,6 +157,62 @@ class AoReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
+ def test_commit_retry_filter(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ self.catalog.create_table('default.test_commit_retry_filter', schema,
False)
+ table = self.catalog.get_table('default.test_commit_retry_filter')
+ write_builder = table.new_batch_write_builder()
+
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'behavior': ['a', 'b', 'c', None],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ }
+ pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
+ data2 = {
+ 'user_id': [5, 6, 7, 8],
+ 'item_id': [1005, 1006, 1007, 1008],
+ 'behavior': ['e', 'f', 'g', 'h'],
+ 'dt': ['p2', 'p1', 'p2', 'p2'],
+ }
+ pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)
+
+ table_write.write_arrow(pa_table1)
+ table_write.write_arrow(pa_table2)
+
+ messages = table_write.prepare_commit()
+ table_commit.commit(messages)
+ table_write.close()
+
+ snapshot_manager = SnapshotManager(table)
+ latest_snapshot = snapshot_manager.get_latest_snapshot()
+ commit_entries = []
+ for msg in messages:
+ partition = GenericRow(list(msg.partition),
table.partition_keys_fields)
+ for file in msg.new_files:
+ commit_entries.append(ManifestEntry(
+ kind=0,
+ partition=partition,
+ bucket=msg.bucket,
+ total_buckets=table.total_buckets,
+ file=file
+ ))
+ # mock retry
+ success = table_commit.file_store_commit._try_commit_once(
+ RetryResult(None),
+ "APPEND",
+ commit_entries,
+ BATCH_COMMIT_IDENTIFIER,
+ latest_snapshot)
+ self.assertTrue(success.is_success())
+ table_commit.close()
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ self.assertEqual(actual, self.expected)
+
def test_over_1000_cols_read(self):
num_rows = 1
num_cols = 10
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index e55e25f7c8..6c65c5173a 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -110,8 +110,8 @@ class FileStoreCommit:
))
self._try_commit(commit_kind="APPEND",
- commit_entries=commit_entries,
- commit_identifier=commit_identifier)
+ commit_identifier=commit_identifier,
+ commit_entries_plan=lambda snapshot: commit_entries)
def overwrite(self, overwrite_partition, commit_messages:
List[CommitMessage], commit_identifier: int):
"""Commit the given commit messages in overwrite mode."""
@@ -133,16 +133,14 @@ class FileStoreCommit:
raise RuntimeError(f"Trying to overwrite partition
{overwrite_partition}, but the changes "
f"in {msg.partition} does not belong to
this partition")
- self._overwrite_partition_filter = partition_filter
- self._overwrite_commit_messages = commit_messages
-
self._try_commit(
commit_kind="OVERWRITE",
- commit_entries=None, # Will be generated in _try_commit based on
latest snapshot
- commit_identifier=commit_identifier
+ commit_identifier=commit_identifier,
+ commit_entries_plan=lambda snapshot:
self._generate_overwrite_entries(
+ snapshot, partition_filter, commit_messages)
)
- def _try_commit(self, commit_kind, commit_entries, commit_identifier):
+ def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan):
import threading
retry_count = 0
@@ -151,9 +149,7 @@ class FileStoreCommit:
thread_id = threading.current_thread().name
while True:
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
-
- if commit_kind == "OVERWRITE":
- commit_entries = self._generate_overwrite_entries()
+ commit_entries = commit_entries_plan(latest_snapshot)
result = self._try_commit_once(
retry_result=retry_result,
@@ -164,7 +160,7 @@ class FileStoreCommit:
)
if result.is_success():
- logger.warning(
+ logger.info(
f"Thread {thread_id}: commit success {latest_snapshot.id +
1 if latest_snapshot else 1} "
f"after {retry_count} retries"
)
@@ -190,24 +186,9 @@ class FileStoreCommit:
def _try_commit_once(self, retry_result: Optional[RetryResult],
commit_kind: str,
commit_entries: List[ManifestEntry],
commit_identifier: int,
latest_snapshot: Optional[Snapshot]) -> CommitResult:
- start_time_ms = int(time.time() * 1000)
-
- if retry_result is not None and latest_snapshot is not None:
- start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID
- if retry_result.latest_snapshot is not None:
- start_check_snapshot_id = retry_result.latest_snapshot.id + 1
-
- for snapshot_id in range(start_check_snapshot_id,
latest_snapshot.id + 2):
- snapshot =
self.snapshot_manager.get_snapshot_by_id(snapshot_id)
- if (snapshot and snapshot.commit_user == self.commit_user and
- snapshot.commit_identifier == commit_identifier and
- snapshot.commit_kind == commit_kind):
- logger.info(
- f"Commit already completed (snapshot {snapshot_id}), "
- f"user: {self.commit_user}, identifier:
{commit_identifier}"
- )
- return SuccessResult()
-
+ if self._is_duplicate_commit(retry_result, latest_snapshot,
commit_identifier, commit_kind):
+ return SuccessResult()
+
unique_id = uuid.uuid4()
base_manifest_list = f"manifest-list-{unique_id}-0"
delta_manifest_list = f"manifest-list-{unique_id}-1"
@@ -242,10 +223,8 @@ class FileStoreCommit:
else:
deleted_file_count += 1
delta_record_count -= entry.file.row_count
-
try:
self.manifest_file_manager.write(new_manifest_file, commit_entries)
-
# TODO: implement noConflictsOrFail logic
partition_columns = list(zip(*(entry.partition.values for entry in
commit_entries)))
partition_min_stats = [min(col) for col in partition_columns]
@@ -253,13 +232,10 @@ class FileStoreCommit:
partition_null_counts = [sum(value == 0 for value in col) for col
in partition_columns]
if not all(count == 0 for count in partition_null_counts):
raise RuntimeError("Partition value should not be null")
-
manifest_file_path =
f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}"
- file_size = self.table.file_io.get_file_size(manifest_file_path)
-
new_manifest_file_meta = ManifestFileMeta(
file_name=new_manifest_file,
- file_size=file_size,
+ file_size=self.table.file_io.get_file_size(manifest_file_path),
num_added_files=added_file_count,
num_deleted_files=deleted_file_count,
partition_stats=SimpleStats(
@@ -275,7 +251,6 @@ class FileStoreCommit:
),
schema_id=self.table.table_schema.id,
)
-
self.manifest_list_manager.write(delta_manifest_list,
[new_manifest_file_meta])
# process existing_manifest
@@ -287,8 +262,8 @@ class FileStoreCommit:
total_record_count += previous_record_count
else:
existing_manifest_files = []
-
self.manifest_list_manager.write(base_manifest_list,
existing_manifest_files)
+
total_record_count += delta_record_count
snapshot_data = Snapshot(
version=3,
@@ -307,8 +282,7 @@ class FileStoreCommit:
# Generate partition statistics for the commit
statistics = self._generate_partition_statistics(commit_entries)
except Exception as e:
- self._cleanup_preparation_failure(new_manifest_file,
delta_manifest_list,
- base_manifest_list)
+ self._cleanup_preparation_failure(delta_manifest_list,
base_manifest_list)
logger.warning(f"Exception occurs when preparing snapshot: {e}",
exc_info=True)
raise RuntimeError(f"Failed to prepare snapshot: {e}")
@@ -317,16 +291,8 @@ class FileStoreCommit:
with self.snapshot_commit:
success = self.snapshot_commit.commit(snapshot_data,
self.table.current_branch(), statistics)
if not success:
- # Commit failed, clean up temporary files and retry
- commit_time_sec = (int(time.time() * 1000) -
start_time_ms) / 1000
- logger.warning(
- f"Atomic commit failed for snapshot #{new_snapshot_id}
"
- f"by user {self.commit_user} "
- f"with identifier {commit_identifier} and kind
{commit_kind} after {commit_time_sec}s. "
- f"Clean up and try again."
- )
- self._cleanup_preparation_failure(new_manifest_file,
delta_manifest_list,
- base_manifest_list)
+ logger.warning(f"Atomic commit failed for snapshot
#{new_snapshot_id} failed")
+ self._cleanup_preparation_failure(delta_manifest_list,
base_manifest_list)
return RetryResult(latest_snapshot, None)
except Exception as e:
# Commit exception, not sure about the situation and should not
clean up the files
@@ -340,14 +306,34 @@ class FileStoreCommit:
)
return SuccessResult()
- def _generate_overwrite_entries(self):
+ def _is_duplicate_commit(self, retry_result, latest_snapshot,
commit_identifier, commit_kind) -> bool:
+ if retry_result is not None and latest_snapshot is not None:
+ start_check_snapshot_id = 1 # Snapshot.FIRST_SNAPSHOT_ID
+ if retry_result.latest_snapshot is not None:
+ start_check_snapshot_id = retry_result.latest_snapshot.id + 1
+
+ for snapshot_id in range(start_check_snapshot_id,
latest_snapshot.id + 1):
+ snapshot =
self.snapshot_manager.get_snapshot_by_id(snapshot_id)
+ if (snapshot and snapshot.commit_user == self.commit_user and
+ snapshot.commit_identifier == commit_identifier and
+ snapshot.commit_kind == commit_kind):
+ logger.info(
+ f"Commit already completed (snapshot {snapshot_id}), "
+ f"user: {self.commit_user}, identifier:
{commit_identifier}"
+ )
+ return True
+ return False
+
+ def _generate_overwrite_entries(self, latestSnapshot, partition_filter,
commit_messages):
"""Generate commit entries for OVERWRITE mode based on latest
snapshot."""
entries = []
- current_entries = FullStartingScanner(self.table,
self._overwrite_partition_filter, None).plan_files()
+ current_entries = [] if latestSnapshot is None \
+ else (FullStartingScanner(self.table, partition_filter, None).
+
read_manifest_entries(self.manifest_list_manager.read_all(latestSnapshot)))
for entry in current_entries:
entry.kind = 1 # DELETE
entries.append(entry)
- for msg in self._overwrite_commit_messages:
+ for msg in commit_messages:
partition = GenericRow(list(msg.partition),
self.table.partition_keys_fields)
for file in msg.new_files:
entries.append(ManifestEntry(
@@ -377,7 +363,7 @@ class FileStoreCommit:
)
time.sleep(total_wait_ms / 1000.0)
- def _cleanup_preparation_failure(self, manifest_file: Optional[str],
+ def _cleanup_preparation_failure(self,
delta_manifest_list: Optional[str],
base_manifest_list: Optional[str]):
try:
@@ -394,10 +380,6 @@ class FileStoreCommit:
if base_manifest_list:
base_path = f"{manifest_path}/{base_manifest_list}"
self.table.file_io.delete_quietly(base_path)
-
- if manifest_file:
- manifest_file_path =
f"{self.manifest_file_manager.manifest_path}/{manifest_file}"
- self.table.file_io.delete_quietly(manifest_file_path)
except Exception as e:
logger.warning(f"Failed to clean up temporary files during
preparation failure: {e}", exc_info=True)