This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 82c3d83ad9 [python] Add conflict detection in shard update (#7630)
82c3d83ad9 is described below
commit 82c3d83ad9959c73a7bdfea0228b7fbcb1ac3dcd
Author: littlecoder04 <[email protected]>
AuthorDate: Sun Apr 12 15:52:13 2026 +0800
[python] Add conflict detection in shard update (#7630)
### Purpose
This PR is a follow-up to #7323. PR #7323 introduced conflict detection
for Python data evolution updates, but the shard-update path was not
covered.
As a result, when shard update and compact run concurrently, the shard
update may commit successfully against a stale scan snapshot instead of
failing fast. The problem only shows up later during read, with the
error:
`All files in a field merge split should have the same row count`.This
PR extends the same conflict-detection coverage to the shard-update
path.
### Tests
run_compact_conflict_test in run_mixed_tests.sh
---
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 69 ++++++++++++++++++++++
paimon-python/dev/run_mixed_tests.sh | 42 ++++++++++++-
paimon-python/pypaimon/read/plan.py | 5 +-
.../pypaimon/read/scanner/file_scanner.py | 22 ++++---
.../pypaimon/read/streaming_table_scan.py | 2 +-
paimon-python/pypaimon/read/table_scan.py | 12 ++--
paimon-python/pypaimon/tests/binary_row_test.py | 8 +--
.../pypaimon/tests/e2e/java_py_read_write_test.py | 59 ++++++++++++++++++
.../pypaimon/tests/reader_append_only_test.py | 45 ++++++++++++++
.../pypaimon/write/commit/commit_scanner.py | 4 +-
paimon-python/pypaimon/write/file_store_commit.py | 2 +-
paimon-python/pypaimon/write/table_update.py | 6 +-
.../pypaimon/write/table_update_by_row_id.py | 5 +-
13 files changed, 254 insertions(+), 27 deletions(-)
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index bd1e006ec7..c09bf34663 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -18,6 +18,8 @@
package org.apache.paimon;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -48,9 +50,11 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
@@ -937,6 +941,71 @@ public class JavaPyE2ETest {
return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
}
+ /** Step 1: Write 5 base files for compact conflict test. */
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testCompactConflictWriteBase() throws Exception {
+ Identifier id = identifier("compact_conflict_test");
+ try {
+ catalog.dropTable(id, true);
+ } catch (Exception ignore) {
+ }
+ Schema schema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT())
+ .column("f1", DataTypes.STRING())
+ .column("f2", DataTypes.STRING())
+ .option(ROW_TRACKING_ENABLED.key(), "true")
+ .option(DATA_EVOLUTION_ENABLED.key(), "true")
+ .option(BUCKET.key(), "-1")
+ .build();
+ catalog.createTable(id, schema, false);
+
+ RowType fullType = schema.rowType().project(Arrays.asList("f0", "f1"));
+
+ for (int fileIdx = 0; fileIdx < 5; fileIdx++) {
+ int startId = fileIdx * 200;
+ FileStoreTable t = (FileStoreTable) catalog.getTable(id);
+ BatchWriteBuilder builder = t.newBatchWriteBuilder();
+ try (BatchTableWrite w =
builder.newWrite().withWriteType(fullType)) {
+ for (int i = 0; i < 200; i++) {
+ w.write(
+ GenericRow.of(
+ startId + i, BinaryString.fromString("n" +
(startId + i))));
+ }
+ builder.newCommit().commit(w.prepareCommit());
+ }
+ }
+ LOG.info("compact_conflict_test: 5 base files written (200 rows each,
total 1000)");
+ }
+
+ /** Step 3: Run compact on compact_conflict_test table. */
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testCompactConflictRunCompact() throws Exception {
+ Identifier id = identifier("compact_conflict_test");
+ doDataEvolutionCompact((FileStoreTable) catalog.getTable(id));
+ LOG.info("compact_conflict_test: compact done, 5 files merged into 1
(1000 rows)");
+ }
+
+ private void doDataEvolutionCompact(FileStoreTable table) throws Exception
{
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, false, false);
+ List<CommitMessage> messages = new ArrayList<>();
+ try {
+ List<DataEvolutionCompactTask> tasks;
+ while (!(tasks = coordinator.plan()).isEmpty()) {
+ for (DataEvolutionCompactTask task : tasks) {
+ messages.add(task.doCompact(table, "test-compact"));
+ }
+ }
+ } catch (EndOfScanException ignore) {
+ }
+ if (!messages.isEmpty()) {
+ table.newBatchWriteBuilder().newCommit().commit(messages);
+ }
+ }
+
private static String rowToStringWithStruct(InternalRow row, RowType type)
{
StringBuilder build = new StringBuilder();
build.append(row.getRowKind().shortString()).append("[");
diff --git a/paimon-python/dev/run_mixed_tests.sh
b/paimon-python/dev/run_mixed_tests.sh
index 3366a5ff1c..077b5af276 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -290,6 +290,32 @@ run_lumina_vector_test() {
fi
}
+run_compact_conflict_test() {
+ echo -e "${YELLOW}=== Running Compact Conflict Test (Java Write Base,
Python Shard Update + Java Compact) ===${NC}"
+
+ cd "$PROJECT_ROOT"
+
+ # Step 1: Java writes 5 base files
+ echo "Running Maven test for JavaPyE2ETest.testCompactConflictWriteBase..."
+ if mvn test
-Dtest=org.apache.paimon.JavaPyE2ETest#testCompactConflictWriteBase -pl
paimon-core -q -Drun.e2e.tests=true; then
+ echo -e "${GREEN}✓ Java write base files completed successfully${NC}"
+ else
+ echo -e "${RED}✗ Java write base files failed${NC}"
+ return 1
+ fi
+
+ # Step 2-4: Python shard update (scan -> Java compact -> commit conflict
detected)
+ cd "$PAIMON_PYTHON_DIR"
+ echo "Running Python test for
JavaPyReadWriteTest.test_compact_conflict_shard_update..."
+ if python -m pytest
java_py_read_write_test.py::JavaPyReadWriteTest::test_compact_conflict_shard_update
-v; then
+ echo -e "${GREEN}✓ Python compact conflict test completed
successfully${NC}"
+ return 0
+ else
+ echo -e "${RED}✗ Python compact conflict test failed${NC}"
+ return 1
+ fi
+}
+
run_blob_alter_compact_test() {
echo -e "${YELLOW}=== Running Blob Alter+Compact Test (Java
Write+Alter+Compact, Python Read) ===${NC}"
@@ -324,6 +350,7 @@ main() {
local compressed_text_result=0
local tantivy_fulltext_result=0
local lumina_vector_result=0
+ local compact_conflict_result=0
local blob_alter_compact_result=0
# Detect Python version
@@ -407,6 +434,13 @@ main() {
echo ""
+ # Run compact conflict test (Java write+compact, Python read)
+ if ! run_compact_conflict_test; then
+ compact_conflict_result=1
+ fi
+
+ echo ""
+
# Run blob alter+compact test (Java write+alter+compact, Python read)
if ! run_blob_alter_compact_test; then
blob_alter_compact_result=1
@@ -470,6 +504,12 @@ main() {
echo -e "${RED}✗ Lumina Vector Index Test (Java Write, Python Read):
FAILED${NC}"
fi
+ if [[ $compact_conflict_result -eq 0 ]]; then
+ echo -e "${GREEN}✓ Compact Conflict Test (Java Write+Compact, Python
Read): PASSED${NC}"
+ else
+ echo -e "${RED}✗ Compact Conflict Test (Java Write+Compact, Python
Read): FAILED${NC}"
+ fi
+
if [[ $blob_alter_compact_result -eq 0 ]]; then
echo -e "${GREEN}✓ Blob Alter+Compact Test (Java Write+Alter+Compact,
Python Read): PASSED${NC}"
else
@@ -481,7 +521,7 @@ main() {
# Clean up warehouse directory after all tests
cleanup_warehouse
- if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$btree_index_result -eq 0 && $compressed_text_result -eq 0 &&
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 &&
$blob_alter_compact_result -eq 0 ]]; then
+ if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$btree_index_result -eq 0 && $compressed_text_result -eq 0 &&
$tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 &&
$compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 ]]; then
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability
verified.${NC}"
return 0
else
diff --git a/paimon-python/pypaimon/read/plan.py
b/paimon-python/pypaimon/read/plan.py
index 8c69a41a9b..c4ebc2408f 100644
--- a/paimon-python/pypaimon/read/plan.py
+++ b/paimon-python/pypaimon/read/plan.py
@@ -16,8 +16,8 @@
# limitations under the License.
################################################################################
-from dataclasses import dataclass
-from typing import List
+from dataclasses import dataclass, field
+from typing import List, Optional
from pypaimon.read.split import Split
@@ -27,6 +27,7 @@ from pypaimon.read.split import Split
class Plan:
"""Implementation of Plan for native Python reading."""
_splits: List[Split]
+ snapshot_id: Optional[int] = field(default=None)
def splits(self) -> List[Split]:
return self._splits
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index b0293dac41..80bf54b384 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -18,7 +18,7 @@ limitations under the License.
import logging
import os
import time
-from typing import Callable, Dict, List, Optional, Set
+from typing import Callable, Dict, List, Optional, Set, Tuple
logger = logging.getLogger(__name__)
@@ -40,6 +40,7 @@ from pypaimon.read.scanner.data_evolution_split_generator
import \
from pypaimon.read.scanner.primary_key_table_split_generator import \
PrimaryKeyTableSplitGenerator
from pypaimon.read.split import DataSplit
+from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.source.deletion_file import DeletionFile
@@ -165,7 +166,7 @@ class FileScanner:
def __init__(
self,
table,
- manifest_scanner: Callable[[], List[ManifestFileMeta]],
+ manifest_scanner: Callable[[], Tuple[List[ManifestFileMeta],
Optional[Snapshot]]],
predicate: Optional[Predicate] = None,
limit: Optional[int] = None
):
@@ -200,6 +201,8 @@ class FileScanner:
self.data_evolution = options.data_evolution_enabled()
self.deletion_vectors_enabled = options.deletion_vectors_enabled()
self._global_index_result = None
+ self._scanned_snapshot = None
+ self._scanned_snapshot_id = None
def schema_fields_func(schema_id: int):
return self.table.schema_manager.get_schema(schema_id).fields
@@ -216,7 +219,8 @@ class FileScanner:
bucket_files = set()
for e in entries:
bucket_files.add((tuple(e.partition.values), e.bucket))
- return
self._scan_dv_index(self.snapshot_manager.get_latest_snapshot(), bucket_files)
+ snapshot = self._scanned_snapshot if self._scanned_snapshot else
self.snapshot_manager.get_latest_snapshot()
+ return self._scan_dv_index(snapshot, bucket_files)
def scan(self) -> Plan:
start_ms = time.time() * 1000
@@ -241,7 +245,7 @@ class FileScanner:
)
if not entries:
- return Plan([])
+ return Plan([], snapshot_id=self._scanned_snapshot_id)
# Configure sharding if needed
if self.idx_of_this_subtask is not None:
@@ -258,7 +262,7 @@ class FileScanner:
"File store scan plan completed in %d ms. Files size: %d",
duration_ms, len(entries)
)
- return Plan(splits)
+ return Plan(splits, snapshot_id=self._scanned_snapshot_id)
def _create_data_evolution_split_generator(self):
row_ranges = None
@@ -272,7 +276,9 @@ class FileScanner:
if row_ranges is None and self.predicate is not None:
row_ranges = _row_ranges_from_predicate(self.predicate)
- manifest_files = self.manifest_scanner()
+ manifest_files, snapshot = self.manifest_scanner()
+ self._scanned_snapshot = snapshot
+ self._scanned_snapshot_id = snapshot.id if snapshot else None
# Filter manifest files by row ranges if available
if row_ranges is not None:
@@ -293,7 +299,9 @@ class FileScanner:
)
def plan_files(self) -> List[ManifestEntry]:
- manifest_files = self.manifest_scanner()
+ manifest_files, snapshot = self.manifest_scanner()
+ self._scanned_snapshot = snapshot
+ self._scanned_snapshot_id = snapshot.id if snapshot else None
if len(manifest_files) == 0:
return []
return self.read_manifest_entries(manifest_files)
diff --git a/paimon-python/pypaimon/read/streaming_table_scan.py
b/paimon-python/pypaimon/read/streaming_table_scan.py
index 835c61dd73..f426d2064f 100644
--- a/paimon-python/pypaimon/read/streaming_table_scan.py
+++ b/paimon-python/pypaimon/read/streaming_table_scan.py
@@ -323,7 +323,7 @@ class AsyncStreamingTableScan:
def _create_initial_plan(self, snapshot: Snapshot) -> Plan:
"""Create a Plan for the initial full scan of the latest snapshot."""
def all_manifests():
- return self._manifest_list_manager.read_all(snapshot)
+ return self._manifest_list_manager.read_all(snapshot), snapshot
starting_scanner = FileScanner(
self.table,
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 9579f875cb..c754b50111 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -59,7 +59,7 @@ class TableScan:
earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
latest_snapshot = snapshot_manager.get_latest_snapshot()
if earliest_snapshot is None or latest_snapshot is None:
- return FileScanner(self.table, lambda: [])
+ return FileScanner(self.table, lambda: ([], None))
start_timestamp = int(ts[0])
end_timestamp = int(ts[1])
if start_timestamp >= end_timestamp:
@@ -67,7 +67,7 @@ class TableScan:
"Ending timestamp %s should be >= starting timestamp %s."
% (end_timestamp, start_timestamp))
if (start_timestamp == end_timestamp or start_timestamp >
latest_snapshot.time_millis
or end_timestamp < earliest_snapshot.time_millis):
- return FileScanner(self.table, lambda: [])
+ return FileScanner(self.table, lambda: ([], None))
starting_snapshot =
snapshot_manager.earlier_or_equal_time_mills(start_timestamp)
earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
@@ -84,8 +84,10 @@ class TableScan:
def incremental_manifest():
snapshots_in_range = []
+ end_snapshot = snapshot_manager.get_snapshot_by_id(end_id) if
end_id >= 1 else None
for snapshot_id in range(start_id + 1, end_id + 1):
snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
+ end_snapshot = snapshot
if snapshot.commit_kind == "APPEND":
snapshots_in_range.append(snapshot)
@@ -94,7 +96,7 @@ class TableScan:
for snapshot in snapshots_in_range:
manifest_files = manifest_list_manager.read_delta(snapshot)
manifests.extend(manifest_files)
- return manifests
+ return manifests, end_snapshot
return FileScanner(self.table, incremental_manifest,
self.predicate, self.limit)
elif options.contains(CoreOptions.SCAN_TAG_NAME): # Handle tag-based
reading
@@ -104,7 +106,7 @@ class TableScan:
tag_manager = self.table.tag_manager()
tag = tag_manager.get_or_throw(tag_name)
snapshot = tag.trim_to_snapshot()
- return manifest_list_manager.read_all(snapshot)
+ return manifest_list_manager.read_all(snapshot), snapshot
return FileScanner(
self.table,
@@ -115,7 +117,7 @@ class TableScan:
def all_manifests():
snapshot = snapshot_manager.get_latest_snapshot()
- return manifest_list_manager.read_all(snapshot)
+ return manifest_list_manager.read_all(snapshot), snapshot
return FileScanner(
self.table,
diff --git a/paimon-python/pypaimon/tests/binary_row_test.py
b/paimon-python/pypaimon/tests/binary_row_test.py
index b1a35226eb..607b184444 100644
--- a/paimon-python/pypaimon/tests/binary_row_test.py
+++ b/paimon-python/pypaimon/tests/binary_row_test.py
@@ -117,7 +117,7 @@ class BinaryRowTest(unittest.TestCase):
def test_is_not_null_append(self):
table = self.catalog.get_table('default.test_append')
- file_scanner = FileScanner(table, lambda: [])
+ file_scanner = FileScanner(table, lambda: ([], None))
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
manifest_files =
file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries =
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
@@ -254,7 +254,7 @@ class BinaryRowTest(unittest.TestCase):
table_write.close()
table_commit.close()
- file_scanner = FileScanner(table, lambda: [])
+ file_scanner = FileScanner(table, lambda: ([], None))
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
manifest_files =
file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries =
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
@@ -293,7 +293,7 @@ class BinaryRowTest(unittest.TestCase):
}
self.assertEqual(expected_data, actual.to_pydict())
- file_scanner = FileScanner(table, lambda: [])
+ file_scanner = FileScanner(table, lambda: ([], None))
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
manifest_files =
file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries =
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
@@ -324,7 +324,7 @@ class BinaryRowTest(unittest.TestCase):
trimmed_pk_fields)
def _overwrite_manifest_entry(self, table):
- file_scanner = FileScanner(table, lambda: [])
+ file_scanner = FileScanner(table, lambda: ([], None))
latest_snapshot = file_scanner.snapshot_manager.get_latest_snapshot()
manifest_files =
file_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries =
file_scanner.manifest_file_manager.read(manifest_files[0].file_name)
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 4a61b9a067..3eee324b6c 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -611,3 +611,62 @@ class JavaPyReadWriteTest(unittest.TestCase):
splits = table_scan.plan().splits()
result = table_read.to_arrow(splits)
self.assertEqual(result.num_rows, 200)
+
+ def test_compact_conflict_shard_update(self):
+ """
+ 1. Java writes 5 base files (testCompactConflictWriteBase)
+ 2. pypaimon ShardTableUpdator scans table, prepares evolution
+ 3. Java runs compact (testCompactConflictRunCompact)
+ 4. pypaimon commits stale evolution -> conflict detected, raises
RuntimeError
+ """
+ import subprocess
+
+ table = self.catalog.get_table('default.compact_conflict_test')
+
+ # Step 2: pypaimon shard update - scan and prepare commit
+ wb = table.new_batch_write_builder()
+ update = wb.new_update()
+ update.with_read_projection(['f0'])
+ update.with_update_type(['f2'])
+ upd = update.new_shard_updator(shard_num=0, total_shard_count=3)
+ print(f"Shard 0 row_ranges: {[(r[1].from_, r[1].to) for r in
upd.row_ranges]}")
+
+ reader = upd.arrow_reader()
+ import pyarrow as pa
+ rows_read = 0
+ for batch in iter(reader.read_next_batch, None):
+ n = batch.num_rows
+ rows_read += n
+ upd.update_by_arrow_batch(
+ pa.RecordBatch.from_pydict(
+ {'f2': [f'evo_{i}' for i in range(n)]},
+ schema=pa.schema([('f2', pa.string())])
+ )
+ )
+ print(f"Shard update read {rows_read} rows")
+ stale_commit_msgs = upd.prepare_commit()
+
+ # Step 3: Java compact (compact happening between scan and commit)
+ project_root = os.path.join(self.tempdir, '..', '..', '..', '..')
+ result = subprocess.run(
+ ['mvn', 'test',
+ '-pl', 'paimon-core',
+
'-Dtest=org.apache.paimon.JavaPyE2ETest#testCompactConflictRunCompact',
+ '-Drun.e2e.tests=true',
+ '-Dsurefire.failIfNoSpecifiedTests=false',
+ '-q'],
+ cwd=os.path.abspath(project_root),
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=True, timeout=120
+ )
+ self.assertEqual(result.returncode, 0,
+ f"Java compact
failed:\n{result.stdout}\n{result.stderr}")
+ print("Java compact completed")
+
+ # Step 4: pypaimon commits stale evolution -> conflict detected
+ tc = wb.new_commit()
+ with self.assertRaises(RuntimeError) as ctx:
+ tc.commit(stale_commit_msgs)
+ self.assertIn("conflicts", str(ctx.exception))
+ tc.close()
+ print(f"Conflict detected as expected: {ctx.exception}")
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index a8529c14a8..c4763dd99e 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -103,6 +103,51 @@ class AoReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
+ def test_plan_snapshot_id_for_empty_and_non_empty_scan(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ self.catalog.create_table('default.test_plan_snapshot_id', schema,
False)
+ table = self.catalog.get_table('default.test_plan_snapshot_id')
+
+ empty_plan = table.new_read_builder().new_scan().plan()
+ self.assertIsNone(empty_plan.snapshot_id)
+ self.assertEqual(len(empty_plan.splits()), 0)
+
+ self._write_test_table(table)
+
+ plan = table.new_read_builder().new_scan().plan()
+ self.assertEqual(plan.snapshot_id, 2)
+ self.assertGreater(len(plan.splits()), 0)
+
+ def test_incremental_timestamp_empty_range_keeps_end_snapshot_id(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+
self.catalog.create_table('default.test_incremental_empty_range_snapshot',
schema, False)
+ table =
self.catalog.get_table('default.test_incremental_empty_range_snapshot')
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ pa_table = pa.Table.from_pydict({
+ 'user_id': [1],
+ 'item_id': [1001],
+ 'behavior': ['a'],
+ 'dt': ['p1'],
+ }, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ snapshot_manager = SnapshotManager(table)
+ snapshot = snapshot_manager.get_latest_snapshot()
+ table_inc = table.copy({
+ CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP.key():
+ "{},{}".format(snapshot.time_millis, snapshot.time_millis + 1)
+ })
+
+ plan = table_inc.new_read_builder().new_scan().plan()
+ self.assertEqual(plan.snapshot_id, snapshot.id)
+ self.assertEqual(len(plan.splits()), 0)
+
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
def test_vortex_ao_reader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'vortex'})
diff --git a/paimon-python/pypaimon/write/commit/commit_scanner.py
b/paimon-python/pypaimon/write/commit/commit_scanner.py
index 6158c86f5c..d758df3675 100644
--- a/paimon-python/pypaimon/write/commit/commit_scanner.py
+++ b/paimon-python/pypaimon/write/commit/commit_scanner.py
@@ -66,7 +66,7 @@ class CommitScanner:
all_manifests = self.manifest_list_manager.read_all(latest_snapshot)
return FileScanner(
- self.table, lambda: [], partition_filter
+ self.table, lambda: ([], None), partition_filter
).read_manifest_entries(all_manifests)
def read_incremental_entries_from_changed_partitions(self, snapshot:
Snapshot,
@@ -92,7 +92,7 @@ class CommitScanner:
partition_filter =
self._build_partition_filter_from_entries(commit_entries)
return FileScanner(
- self.table, lambda: [], partition_filter
+ self.table, lambda: ([], None), partition_filter
).read_manifest_entries(delta_manifests)
def _build_partition_filter_from_entries(self, entries:
List[ManifestEntry]):
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index a4099ac1db..8937842680 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -516,7 +516,7 @@ class FileStoreCommit:
"""Generate commit entries for OVERWRITE mode based on latest
snapshot."""
entries = []
current_entries = [] if latest_snapshot is None \
- else (FileScanner(self.table, lambda: [], partition_filter).
+ else (FileScanner(self.table, lambda: ([], None),
partition_filter).
read_manifest_entries(self.manifest_list_manager.read_all(latest_snapshot)))
for entry in current_entries:
entry.kind = 1 # DELETE
diff --git a/paimon-python/pypaimon/write/table_update.py
b/paimon-python/pypaimon/write/table_update.py
index b26194eb55..ec192a98a5 100644
--- a/paimon-python/pypaimon/write/table_update.py
+++ b/paimon-python/pypaimon/write/table_update.py
@@ -171,7 +171,9 @@ class ShardTableUpdator:
self.dict = defaultdict(list)
scanner = self.table.new_read_builder().new_scan()
- splits = scanner.plan().splits()
+ plan = scanner.plan()
+ self.snapshot_id = plan.snapshot_id if plan.snapshot_id is not None
else -1
+ splits = plan.splits()
splits = _filter_by_whole_file_shard(splits, shard_num,
total_shard_count)
self.splits = splits
@@ -197,7 +199,7 @@ class ShardTableUpdator:
def prepare_commit(self) -> List[CommitMessage]:
commit_messages = []
for (partition, files) in self.dict.items():
- commit_messages.append(CommitMessage(partition, 0, files))
+ commit_messages.append(CommitMessage(partition, 0, files,
self.snapshot_id))
return commit_messages
def update_by_arrow_batch(self, data: pa.RecordBatch):
diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py
b/paimon-python/pypaimon/write/table_update_by_row_id.py
index 089fde047c..2e2efe44f7 100644
--- a/paimon-python/pypaimon/write/table_update_by_row_id.py
+++ b/paimon-python/pypaimon/write/table_update_by_row_id.py
@@ -65,7 +65,8 @@ class TableUpdateByRowId:
read_builder = self.table.new_read_builder()
scan = read_builder.new_scan()
- splits = scan.plan().splits()
+ plan = scan.plan()
+ splits = plan.splits()
for split in splits:
for file in split.files:
@@ -77,7 +78,7 @@ class TableUpdateByRowId:
total_row_count = sum(first_row_id_to_row_count_map.values())
- snapshot_id = self.table.snapshot_manager().get_latest_snapshot().id
+ snapshot_id = plan.snapshot_id if plan.snapshot_id is not None else -1
return (snapshot_id,
sorted(list(set(first_row_ids))),
first_row_id_to_partition_map,