This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ed220745ba [python][ray] Reject unsafe primary-key writes (#8071)
ed220745ba is described below

commit ed220745baea8126253c86bcbaade32753912d34
Author: QuakeWang <[email protected]>
AuthorDate: Tue Jun 2 14:55:23 2026 +0800

    [python][ray] Reject unsafe primary-key writes (#8071)
    
    Ray writes create one independent Paimon writer per write task. For
    primary-key tables, this is unsafe unless all rows for each `(partition,
    bucket)` are routed to one writer.
    
    The previous Ray path only guarded HASH_FIXED primary-key tables in
    default/off mode. Default primary-key tables use dynamic buckets
    (`bucket = -1`), so they were written as-is. Multiple Ray tasks could
    then assign buckets and sequence numbers from independent local state,
    producing duplicate keys with overlapping `_SEQUENCE_NUMBER`.
    
    This PR makes Ray primary-key writes fail fast unless the table is
    HASH_FIXED and `hash_fixed_precluster="map_groups"` is selected.
    Append-only non-HASH_FIXED tables still pass through unchanged. The
    PyPaimon Ray docs are updated to match the new behavior.
---
 docs/docs/pypaimon/ray-data.md                     | 17 +++--
 paimon-python/pypaimon/ray/ray_paimon.py           |  8 +--
 paimon-python/pypaimon/ray/shuffle.py              | 30 +++++---
 .../pypaimon/tests/ray_repartition_test.py         | 81 +++++++++++++++++++++-
 .../pypaimon/tests/test_ray_shuffle_helper.py      | 62 +++++++++++++++--
 paimon-python/pypaimon/write/table_write.py        |  2 +-
 6 files changed, 175 insertions(+), 25 deletions(-)

diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md
index e19e987c4b..589a15b9ca 100644
--- a/docs/docs/pypaimon/ray-data.md
+++ b/docs/docs/pypaimon/ray-data.md
@@ -241,7 +241,13 @@ inherits Ray's `map_groups()` memory bound. Large 
append-only buckets
 or hot append-only partitions should use the default mode or
 `hash_fixed_precluster="off"`.
 
-For non-HASH_FIXED tables the dataset is written as-is.
+For non-HASH_FIXED append-only tables, the dataset is written as-is.
+Postpone-bucket primary-key tables (`bucket = -2`) are also written
+as-is to the `bucket-postpone` directory. HASH_DYNAMIC and
+CROSS_PARTITION primary-key Ray writes are not supported and fail fast,
+including the default dynamic-bucket primary-key table (`bucket = -1`).
+Ray write tasks create independent Paimon writers, which can assign
+overlapping buckets or sequence numbers for those modes.
 
 **Parameters:**
 - `dataset`: the Ray Dataset to write.
@@ -254,8 +260,10 @@ For non-HASH_FIXED tables the dataset is written as-is.
 - `hash_fixed_precluster`: HASH_FIXED pre-clustering mode. `"auto"` and
   `"off"` write append-only HASH_FIXED tables directly and reject
   HASH_FIXED primary-key tables. `"map_groups"` enables the legacy
-  small-file optimization and requires each `(partition_keys..., bucket)`
-  group to fit in memory on one Ray node.
+  small-file optimization for HASH_FIXED primary-key tables and requires
+  each `(partition_keys..., bucket)` group to fit in memory on one Ray
+  node. This option does not enable Ray writes for HASH_DYNAMIC or
+  CROSS_PARTITION primary-key tables.
 
 ### `TableWrite.write_ray()` (lower-level)
 
@@ -290,7 +298,8 @@ table_write.write_ray(
 #   - overwrite: Whether to overwrite existing data (default: False)
 #   - concurrency: Optional max number of concurrent Ray tasks
 #   - ray_remote_args: Optional kwargs passed to ray.remote() (e.g., 
{"num_cpus": 2})
-#   - hash_fixed_precluster: Same HASH_FIXED modes as write_paimon()
+#   - hash_fixed_precluster: Same HASH_FIXED modes and primary-key safety
+#     checks as write_paimon()
 
 # 3. Commit data (required for write_pandas/write_arrow/write_arrow_batch only)
 commit_messages = table_write.prepare_commit()
diff --git a/paimon-python/pypaimon/ray/ray_paimon.py 
b/paimon-python/pypaimon/ray/ray_paimon.py
index e2924dcda6..3acbe91ace 100644
--- a/paimon-python/pypaimon/ray/ray_paimon.py
+++ b/paimon-python/pypaimon/ray/ray_paimon.py
@@ -122,9 +122,9 @@ def write_paimon(
     writer. Optional pre-clustering is only a file-count optimization.
     The legacy ``map_groups`` pre-clustering mode materializes each
     ``(partition_keys..., bucket)`` group on one Ray node and should
-    only be used when every group fits in memory. HASH_FIXED primary-key
-    tables require ``map_groups`` until Ray writes have a bounded
-    strategy that preserves per-bucket sequence ordering.
+    only be used when every group fits in memory. HASH_DYNAMIC and
+    CROSS_PARTITION primary-key Ray writes are rejected because Ray
+    write tasks create independent Paimon writers.
 
     Args:
         dataset: The Ray Dataset to write.
@@ -137,7 +137,7 @@ def write_paimon(
             and ``"off"`` write append-only HASH_FIXED tables directly
             and reject HASH_FIXED primary-key tables. ``"map_groups"``
             preserves the legacy small-file optimization and its single
-            group memory bound.
+            group memory bound for HASH_FIXED primary-key tables.
     """
     from pypaimon.catalog.catalog_factory import CatalogFactory
     from pypaimon.ray.shuffle import maybe_apply_repartition
diff --git a/paimon-python/pypaimon/ray/shuffle.py 
b/paimon-python/pypaimon/ray/shuffle.py
index c7a1c5b85b..5079bf6215 100644
--- a/paimon-python/pypaimon/ray/shuffle.py
+++ b/paimon-python/pypaimon/ray/shuffle.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-"""Optional pre-clustering for Ray writes to HASH_FIXED Paimon tables.
+"""Optional pre-clustering and write guards for Ray writes.
 
 The legacy ``map_groups`` strategy groups rows by
 ``(partition_keys..., bucket)`` so every distinct group lands in a
@@ -24,7 +24,8 @@ single Ray task. This can reduce file count, but Ray requires 
each
 ``map_groups`` group to fit in memory on one node. Keep that strategy
 behind an explicit opt-in.
 
-For any other bucket mode the dataset is returned unchanged.
+For append-only tables in any other bucket mode the dataset is returned
+unchanged.
 """
 
 import uuid
@@ -74,10 +75,9 @@ def maybe_apply_repartition(
     ``auto`` currently behaves like ``off`` for append-only tables
     because the old ``map_groups`` strategy materializes each
     ``(partition, bucket)`` group on one Ray node. For primary-key
-    tables, direct writes are rejected because multiple Ray tasks can
-    write the same bucket with overlapping sequence numbers. Use
-    ``map_groups`` only when both bounds are acceptable for the
-    workload.
+    tables, unsafe Ray write plans are rejected because multiple Ray
+    tasks create independent Paimon writers and can assign overlapping
+    sequence numbers.
     """
     if hash_fixed_precluster not in HASH_FIXED_PRECLUSTER_MODES:
         raise ValueError(
@@ -87,14 +87,28 @@ def maybe_apply_repartition(
             )
         )
 
-    if table.bucket_mode() != BucketMode.HASH_FIXED:
+    bucket_mode = table.bucket_mode()
+    is_primary_key_table = getattr(table, "is_primary_key_table", False)
+
+    if bucket_mode != BucketMode.HASH_FIXED:
+        if is_primary_key_table and bucket_mode in (
+                BucketMode.HASH_DYNAMIC,
+                BucketMode.CROSS_PARTITION,
+        ):
+            raise ValueError(
+                "{} primary-key Ray writes are not supported. Multiple "
+                "Ray tasks create independent Paimon writers, which can "
+                "assign overlapping buckets or sequence numbers.".format(
+                    bucket_mode.name
+                )
+            )
         return dataset
 
     if hash_fixed_precluster in (
             HASH_FIXED_PRECLUSTER_AUTO,
             HASH_FIXED_PRECLUSTER_OFF,
     ):
-        if getattr(table, "is_primary_key_table", False):
+        if is_primary_key_table:
             raise ValueError(
                 "HASH_FIXED primary-key Ray writes require "
                 "hash_fixed_precluster='map_groups'. Direct writes can "
diff --git a/paimon-python/pypaimon/tests/ray_repartition_test.py 
b/paimon-python/pypaimon/tests/ray_repartition_test.py
index 01048e850b..0d7dd568c0 100644
--- a/paimon-python/pypaimon/tests/ray_repartition_test.py
+++ b/paimon-python/pypaimon/tests/ray_repartition_test.py
@@ -32,7 +32,9 @@ explicitly selected. These tests cover:
     (partition, bucket) on the small test dataset.
   * regression: a table whose schema already contains a column named
     ``__paimon_bucket__`` still works (collision-safe column name).
-  * non-HASH_FIXED tables (BUCKET_UNAWARE etc.) pass through unchanged.
+  * non-HASH_FIXED append-only tables pass through unchanged.
+  * dynamic-bucket primary-key tables fail fast, while postpone-bucket
+    primary-key tables pass through.
 """
 
 import glob
@@ -190,6 +192,83 @@ class RayShuffleTest(unittest.TestCase):
         finally:
             writer.close()
 
+    def test_primary_key_dynamic_bucket_default_fails_fast(self):
+        from pypaimon.ray import write_paimon
+
+        pa_schema = pa.schema([
+            pa.field('id', pa.int32(), nullable=False),
+            ('name', pa.string()),
+        ])
+        table_name = 'test_pk_dynamic_bucket_default_fails_fast'
+        identifier = self._make_table(
+            table_name, pa_schema, primary_keys=['id'],
+        )
+
+        rows = pa.Table.from_pydict(
+            {'id': list(range(40)), 'name': [f'v{i}' for i in range(40)]},
+            schema=pa_schema,
+        )
+        ds = ray.data.from_arrow(rows).repartition(4)
+
+        with self.assertRaisesRegex(ValueError, "HASH_DYNAMIC primary-key"):
+            write_paimon(ds, identifier, self.catalog_options)
+
+    def 
test_table_write_ray_primary_key_dynamic_bucket_default_fails_fast(self):
+        pa_schema = pa.schema([
+            pa.field('id', pa.int32(), nullable=False),
+            ('name', pa.string()),
+        ])
+        table_name = 'test_table_write_ray_pk_dynamic_default_fails_fast'
+        identifier = self._make_table(
+            table_name, pa_schema, primary_keys=['id'],
+        )
+
+        rows = pa.Table.from_pydict(
+            {'id': list(range(40)), 'name': [f'v{i}' for i in range(40)]},
+            schema=pa_schema,
+        )
+        ds = ray.data.from_arrow(rows).repartition(4)
+
+        catalog = CatalogFactory.create(self.catalog_options)
+        table = catalog.get_table(identifier)
+        writer = table.new_batch_write_builder().new_write()
+        try:
+            with self.assertRaisesRegex(ValueError, "HASH_DYNAMIC 
primary-key"):
+                writer.write_ray(ds)
+        finally:
+            writer.close()
+
+    def test_primary_key_postpone_bucket_roundtrip_to_postpone_files(self):
+        from pypaimon.ray import write_paimon
+
+        pa_schema = pa.schema([
+            pa.field('id', pa.int32(), nullable=False),
+            ('dt', pa.string()),
+            ('value', pa.int64()),
+        ])
+        table_name = 'test_pk_postpone_bucket_ray_write'
+        identifier = self._make_table(
+            table_name, pa_schema,
+            primary_keys=['id', 'dt'], partition_keys=['dt'],
+            options={'bucket': '-2'},
+        )
+
+        rows = pa.Table.from_pydict({
+            'id': list(range(10)),
+            'dt': ['2026-01-01'] * 5 + ['2026-01-02'] * 5,
+            'value': list(range(10)),
+        }, schema=pa_schema)
+        write_paimon(
+            ray.data.from_arrow(rows).repartition(2),
+            identifier,
+            self.catalog_options,
+        )
+
+        files = self._count_data_files(table_name)
+        self.assertGreater(len(files), 0)
+        self.assertTrue(all('/bucket-postpone/' in path for path in files))
+        self.assertEqual(len(self._read_table(identifier)), 0)
+
     def test_partitioned_fixed_bucket_roundtrip(self):
         """Partitioned table — confirms the post-groupby schema does not
         end up with duplicated partition-key or bucket columns."""
diff --git a/paimon-python/pypaimon/tests/test_ray_shuffle_helper.py 
b/paimon-python/pypaimon/tests/test_ray_shuffle_helper.py
index eb974dd79a..6cfa5ea5f6 100644
--- a/paimon-python/pypaimon/tests/test_ray_shuffle_helper.py
+++ b/paimon-python/pypaimon/tests/test_ray_shuffle_helper.py
@@ -25,16 +25,29 @@ large-type coercion, and the bucket-mode dispatch in
 in ``pypaimon/tests/ray_repartition_test.py``.
 """
 
+import importlib.util
+from pathlib import Path
 import unittest
 from unittest.mock import MagicMock
 
 import pyarrow as pa
 
-from pypaimon.ray.shuffle import (BUCKET_KEY_COL, _coerce_large_string_types,
-                                  _make_bucket_udf, _pick_bucket_col_name,
-                                  maybe_apply_repartition)
 from pypaimon.table.bucket_mode import BucketMode
 
+_SHUFFLE_PATH = (
+    Path(__file__).resolve().parents[1] / "ray" / "shuffle.py"
+)
+_SHUFFLE_SPEC = importlib.util.spec_from_file_location(
+    "pypaimon_ray_shuffle_under_test", _SHUFFLE_PATH)
+_SHUFFLE = importlib.util.module_from_spec(_SHUFFLE_SPEC)
+_SHUFFLE_SPEC.loader.exec_module(_SHUFFLE)
+
+BUCKET_KEY_COL = _SHUFFLE.BUCKET_KEY_COL
+_coerce_large_string_types = _SHUFFLE._coerce_large_string_types
+_make_bucket_udf = _SHUFFLE._make_bucket_udf
+_pick_bucket_col_name = _SHUFFLE._pick_bucket_col_name
+maybe_apply_repartition = _SHUFFLE.maybe_apply_repartition
+
 
 class BucketUdfTest(unittest.TestCase):
     """The bucket-key UDF appends a deterministic int32 column."""
@@ -135,13 +148,13 @@ class CoerceLargeStringTypesTest(unittest.TestCase):
 
 
 class BucketModeDispatchTest(unittest.TestCase):
-    """``maybe_apply_repartition`` only clusters HASH_FIXED tables when
-    the legacy ``map_groups`` mode is explicitly selected."""
+    """``maybe_apply_repartition`` clusters only supported HASH_FIXED
+    writes and rejects unsafe primary-key Ray writes."""
 
-    def _make_table(self, bucket_mode):
+    def _make_table(self, bucket_mode, is_primary_key_table=False):
         table = MagicMock()
         table.bucket_mode.return_value = bucket_mode
-        table.is_primary_key_table = False
+        table.is_primary_key_table = is_primary_key_table
         return table
 
     def test_bucket_unaware_returns_dataset_unchanged(self):
@@ -156,12 +169,47 @@ class BucketModeDispatchTest(unittest.TestCase):
 
         self.assertIs(maybe_apply_repartition(dataset, table), dataset)
 
+    def test_hash_dynamic_primary_key_raises_value_error(self):
+        dataset = MagicMock(name="dataset")
+        table = self._make_table(
+            BucketMode.HASH_DYNAMIC, is_primary_key_table=True)
+
+        with self.assertRaisesRegex(ValueError, "HASH_DYNAMIC primary-key"):
+            maybe_apply_repartition(dataset, table)
+        dataset.map_batches.assert_not_called()
+
+    def test_hash_dynamic_primary_key_map_groups_raises_value_error(self):
+        dataset = MagicMock(name="dataset")
+        table = self._make_table(
+            BucketMode.HASH_DYNAMIC, is_primary_key_table=True)
+
+        with self.assertRaisesRegex(ValueError, "HASH_DYNAMIC primary-key"):
+            maybe_apply_repartition(dataset, table, "map_groups")
+        dataset.map_batches.assert_not_called()
+
     def test_cross_partition_returns_dataset_unchanged(self):
         dataset = object()
         table = self._make_table(BucketMode.CROSS_PARTITION)
 
         self.assertIs(maybe_apply_repartition(dataset, table), dataset)
 
+    def test_cross_partition_primary_key_raises_value_error(self):
+        dataset = MagicMock(name="dataset")
+        table = self._make_table(
+            BucketMode.CROSS_PARTITION, is_primary_key_table=True)
+
+        with self.assertRaisesRegex(ValueError, "CROSS_PARTITION primary-key"):
+            maybe_apply_repartition(dataset, table)
+        dataset.map_batches.assert_not_called()
+
+    def test_postpone_primary_key_returns_dataset_unchanged(self):
+        dataset = MagicMock(name="dataset")
+        table = self._make_table(
+            BucketMode.POSTPONE_MODE, is_primary_key_table=True)
+
+        self.assertIs(maybe_apply_repartition(dataset, table), dataset)
+        dataset.map_batches.assert_not_called()
+
     def test_hash_fixed_default_returns_dataset_unchanged(self):
         dataset = MagicMock(name="dataset")
         table = MagicMock()
diff --git a/paimon-python/pypaimon/write/table_write.py 
b/paimon-python/pypaimon/write/table_write.py
index 9336fff028..1eb63793d0 100644
--- a/paimon-python/pypaimon/write/table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -94,7 +94,7 @@ class TableWrite:
                 and ``"off"`` write append-only HASH_FIXED tables directly
                 and reject HASH_FIXED primary-key tables. ``"map_groups"``
                 preserves the legacy small-file optimization and its single
-                group memory bound.
+                group memory bound for HASH_FIXED primary-key tables.
         """
         from pypaimon.ray.shuffle import maybe_apply_repartition
         from pypaimon.write.ray_datasink import PaimonDatasink

Reply via email to