This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ed220745ba [python][ray] Reject unsafe primary-key writes (#8071)
ed220745ba is described below
commit ed220745baea8126253c86bcbaade32753912d34
Author: QuakeWang <[email protected]>
AuthorDate: Tue Jun 2 14:55:23 2026 +0800
[python][ray] Reject unsafe primary-key writes (#8071)
Ray writes create one independent Paimon writer per write task. For
primary-key tables, this is unsafe unless all rows for each `(partition,
bucket)` are routed to one writer.
The previous Ray path only guarded HASH_FIXED primary-key tables in
default/off mode. Default primary-key tables use dynamic buckets
(`bucket = -1`), so they were written as-is. Multiple Ray tasks could
then assign buckets and sequence numbers from independent local state,
producing duplicate keys with overlapping `_SEQUENCE_NUMBER`.
This PR makes Ray primary-key writes fail fast unless the table is
HASH_FIXED and `hash_fixed_precluster="map_groups"` is selected.
Append-only non-HASH_FIXED tables still pass through unchanged. The
PyPaimon Ray docs are updated to match the new behavior.
---
docs/docs/pypaimon/ray-data.md | 17 +++--
paimon-python/pypaimon/ray/ray_paimon.py | 8 +--
paimon-python/pypaimon/ray/shuffle.py | 30 +++++---
.../pypaimon/tests/ray_repartition_test.py | 81 +++++++++++++++++++++-
.../pypaimon/tests/test_ray_shuffle_helper.py | 62 +++++++++++++++--
paimon-python/pypaimon/write/table_write.py | 2 +-
6 files changed, 175 insertions(+), 25 deletions(-)
diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md
index e19e987c4b..589a15b9ca 100644
--- a/docs/docs/pypaimon/ray-data.md
+++ b/docs/docs/pypaimon/ray-data.md
@@ -241,7 +241,13 @@ inherits Ray's `map_groups()` memory bound. Large
append-only buckets
or hot append-only partitions should use the default mode or
`hash_fixed_precluster="off"`.
-For non-HASH_FIXED tables the dataset is written as-is.
+For non-HASH_FIXED append-only tables, the dataset is written as-is.
+Postpone-bucket primary-key tables (`bucket = -2`) are also written
+as-is to the `bucket-postpone` directory. HASH_DYNAMIC and
+CROSS_PARTITION primary-key Ray writes are not supported and fail fast,
+including the default dynamic-bucket primary-key table (`bucket = -1`).
+Ray write tasks create independent Paimon writers, which can assign
+overlapping buckets or sequence numbers for those modes.
**Parameters:**
- `dataset`: the Ray Dataset to write.
@@ -254,8 +260,10 @@ For non-HASH_FIXED tables the dataset is written as-is.
- `hash_fixed_precluster`: HASH_FIXED pre-clustering mode. `"auto"` and
`"off"` write append-only HASH_FIXED tables directly and reject
HASH_FIXED primary-key tables. `"map_groups"` enables the legacy
- small-file optimization and requires each `(partition_keys..., bucket)`
- group to fit in memory on one Ray node.
+ small-file optimization for HASH_FIXED primary-key tables and requires
+ each `(partition_keys..., bucket)` group to fit in memory on one Ray
+ node. This option does not enable Ray writes for HASH_DYNAMIC or
+ CROSS_PARTITION primary-key tables.
### `TableWrite.write_ray()` (lower-level)
@@ -290,7 +298,8 @@ table_write.write_ray(
# - overwrite: Whether to overwrite existing data (default: False)
# - concurrency: Optional max number of concurrent Ray tasks
# - ray_remote_args: Optional kwargs passed to ray.remote() (e.g.,
{"num_cpus": 2})
-# - hash_fixed_precluster: Same HASH_FIXED modes as write_paimon()
+# - hash_fixed_precluster: Same HASH_FIXED modes and primary-key safety
+# checks as write_paimon()
# 3. Commit data (required for write_pandas/write_arrow/write_arrow_batch only)
commit_messages = table_write.prepare_commit()
diff --git a/paimon-python/pypaimon/ray/ray_paimon.py
b/paimon-python/pypaimon/ray/ray_paimon.py
index e2924dcda6..3acbe91ace 100644
--- a/paimon-python/pypaimon/ray/ray_paimon.py
+++ b/paimon-python/pypaimon/ray/ray_paimon.py
@@ -122,9 +122,9 @@ def write_paimon(
writer. Optional pre-clustering is only a file-count optimization.
The legacy ``map_groups`` pre-clustering mode materializes each
``(partition_keys..., bucket)`` group on one Ray node and should
- only be used when every group fits in memory. HASH_FIXED primary-key
- tables require ``map_groups`` until Ray writes have a bounded
- strategy that preserves per-bucket sequence ordering.
+ only be used when every group fits in memory. HASH_DYNAMIC and
+ CROSS_PARTITION primary-key Ray writes are rejected because Ray
+ write tasks create independent Paimon writers.
Args:
dataset: The Ray Dataset to write.
@@ -137,7 +137,7 @@ def write_paimon(
and ``"off"`` write append-only HASH_FIXED tables directly
and reject HASH_FIXED primary-key tables. ``"map_groups"``
preserves the legacy small-file optimization and its single
- group memory bound.
+ group memory bound for HASH_FIXED primary-key tables.
"""
from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.ray.shuffle import maybe_apply_repartition
diff --git a/paimon-python/pypaimon/ray/shuffle.py
b/paimon-python/pypaimon/ray/shuffle.py
index c7a1c5b85b..5079bf6215 100644
--- a/paimon-python/pypaimon/ray/shuffle.py
+++ b/paimon-python/pypaimon/ray/shuffle.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-"""Optional pre-clustering for Ray writes to HASH_FIXED Paimon tables.
+"""Optional pre-clustering and write guards for Ray writes.
The legacy ``map_groups`` strategy groups rows by
``(partition_keys..., bucket)`` so every distinct group lands in a
@@ -24,7 +24,8 @@ single Ray task. This can reduce file count, but Ray requires
each
``map_groups`` group to fit in memory on one node. Keep that strategy
behind an explicit opt-in.
-For any other bucket mode the dataset is returned unchanged.
+For append-only tables in any other bucket mode the dataset is returned
+unchanged.
"""
import uuid
@@ -74,10 +75,9 @@ def maybe_apply_repartition(
``auto`` currently behaves like ``off`` for append-only tables
because the old ``map_groups`` strategy materializes each
``(partition, bucket)`` group on one Ray node. For primary-key
- tables, direct writes are rejected because multiple Ray tasks can
- write the same bucket with overlapping sequence numbers. Use
- ``map_groups`` only when both bounds are acceptable for the
- workload.
+ tables, unsafe Ray write plans are rejected because multiple Ray
+ tasks create independent Paimon writers and can assign overlapping
+ sequence numbers.
"""
if hash_fixed_precluster not in HASH_FIXED_PRECLUSTER_MODES:
raise ValueError(
@@ -87,14 +87,28 @@ def maybe_apply_repartition(
)
)
- if table.bucket_mode() != BucketMode.HASH_FIXED:
+ bucket_mode = table.bucket_mode()
+ is_primary_key_table = getattr(table, "is_primary_key_table", False)
+
+ if bucket_mode != BucketMode.HASH_FIXED:
+ if is_primary_key_table and bucket_mode in (
+ BucketMode.HASH_DYNAMIC,
+ BucketMode.CROSS_PARTITION,
+ ):
+ raise ValueError(
+ "{} primary-key Ray writes are not supported. Multiple "
+ "Ray tasks create independent Paimon writers, which can "
+ "assign overlapping buckets or sequence numbers.".format(
+ bucket_mode.name
+ )
+ )
return dataset
if hash_fixed_precluster in (
HASH_FIXED_PRECLUSTER_AUTO,
HASH_FIXED_PRECLUSTER_OFF,
):
- if getattr(table, "is_primary_key_table", False):
+ if is_primary_key_table:
raise ValueError(
"HASH_FIXED primary-key Ray writes require "
"hash_fixed_precluster='map_groups'. Direct writes can "
diff --git a/paimon-python/pypaimon/tests/ray_repartition_test.py
b/paimon-python/pypaimon/tests/ray_repartition_test.py
index 01048e850b..0d7dd568c0 100644
--- a/paimon-python/pypaimon/tests/ray_repartition_test.py
+++ b/paimon-python/pypaimon/tests/ray_repartition_test.py
@@ -32,7 +32,9 @@ explicitly selected. These tests cover:
(partition, bucket) on the small test dataset.
* regression: a table whose schema already contains a column named
``__paimon_bucket__`` still works (collision-safe column name).
- * non-HASH_FIXED tables (BUCKET_UNAWARE etc.) pass through unchanged.
+ * non-HASH_FIXED append-only tables pass through unchanged.
+ * dynamic-bucket primary-key tables fail fast, while postpone-bucket
+ primary-key tables pass through.
"""
import glob
@@ -190,6 +192,83 @@ class RayShuffleTest(unittest.TestCase):
finally:
writer.close()
+ def test_primary_key_dynamic_bucket_default_fails_fast(self):
+ from pypaimon.ray import write_paimon
+
+ pa_schema = pa.schema([
+ pa.field('id', pa.int32(), nullable=False),
+ ('name', pa.string()),
+ ])
+ table_name = 'test_pk_dynamic_bucket_default_fails_fast'
+ identifier = self._make_table(
+ table_name, pa_schema, primary_keys=['id'],
+ )
+
+ rows = pa.Table.from_pydict(
+ {'id': list(range(40)), 'name': [f'v{i}' for i in range(40)]},
+ schema=pa_schema,
+ )
+ ds = ray.data.from_arrow(rows).repartition(4)
+
+ with self.assertRaisesRegex(ValueError, "HASH_DYNAMIC primary-key"):
+ write_paimon(ds, identifier, self.catalog_options)
+
+ def
test_table_write_ray_primary_key_dynamic_bucket_default_fails_fast(self):
+ pa_schema = pa.schema([
+ pa.field('id', pa.int32(), nullable=False),
+ ('name', pa.string()),
+ ])
+ table_name = 'test_table_write_ray_pk_dynamic_default_fails_fast'
+ identifier = self._make_table(
+ table_name, pa_schema, primary_keys=['id'],
+ )
+
+ rows = pa.Table.from_pydict(
+ {'id': list(range(40)), 'name': [f'v{i}' for i in range(40)]},
+ schema=pa_schema,
+ )
+ ds = ray.data.from_arrow(rows).repartition(4)
+
+ catalog = CatalogFactory.create(self.catalog_options)
+ table = catalog.get_table(identifier)
+ writer = table.new_batch_write_builder().new_write()
+ try:
+ with self.assertRaisesRegex(ValueError, "HASH_DYNAMIC
primary-key"):
+ writer.write_ray(ds)
+ finally:
+ writer.close()
+
+ def test_primary_key_postpone_bucket_roundtrip_to_postpone_files(self):
+ from pypaimon.ray import write_paimon
+
+ pa_schema = pa.schema([
+ pa.field('id', pa.int32(), nullable=False),
+ ('dt', pa.string()),
+ ('value', pa.int64()),
+ ])
+ table_name = 'test_pk_postpone_bucket_ray_write'
+ identifier = self._make_table(
+ table_name, pa_schema,
+ primary_keys=['id', 'dt'], partition_keys=['dt'],
+ options={'bucket': '-2'},
+ )
+
+ rows = pa.Table.from_pydict({
+ 'id': list(range(10)),
+ 'dt': ['2026-01-01'] * 5 + ['2026-01-02'] * 5,
+ 'value': list(range(10)),
+ }, schema=pa_schema)
+ write_paimon(
+ ray.data.from_arrow(rows).repartition(2),
+ identifier,
+ self.catalog_options,
+ )
+
+ files = self._count_data_files(table_name)
+ self.assertGreater(len(files), 0)
+ self.assertTrue(all('/bucket-postpone/' in path for path in files))
+ self.assertEqual(len(self._read_table(identifier)), 0)
+
def test_partitioned_fixed_bucket_roundtrip(self):
"""Partitioned table — confirms the post-groupby schema does not
end up with duplicated partition-key or bucket columns."""
diff --git a/paimon-python/pypaimon/tests/test_ray_shuffle_helper.py
b/paimon-python/pypaimon/tests/test_ray_shuffle_helper.py
index eb974dd79a..6cfa5ea5f6 100644
--- a/paimon-python/pypaimon/tests/test_ray_shuffle_helper.py
+++ b/paimon-python/pypaimon/tests/test_ray_shuffle_helper.py
@@ -25,16 +25,29 @@ large-type coercion, and the bucket-mode dispatch in
in ``pypaimon/tests/ray_repartition_test.py``.
"""
+import importlib.util
+from pathlib import Path
import unittest
from unittest.mock import MagicMock
import pyarrow as pa
-from pypaimon.ray.shuffle import (BUCKET_KEY_COL, _coerce_large_string_types,
- _make_bucket_udf, _pick_bucket_col_name,
- maybe_apply_repartition)
from pypaimon.table.bucket_mode import BucketMode
+_SHUFFLE_PATH = (
+ Path(__file__).resolve().parents[1] / "ray" / "shuffle.py"
+)
+_SHUFFLE_SPEC = importlib.util.spec_from_file_location(
+ "pypaimon_ray_shuffle_under_test", _SHUFFLE_PATH)
+_SHUFFLE = importlib.util.module_from_spec(_SHUFFLE_SPEC)
+_SHUFFLE_SPEC.loader.exec_module(_SHUFFLE)
+
+BUCKET_KEY_COL = _SHUFFLE.BUCKET_KEY_COL
+_coerce_large_string_types = _SHUFFLE._coerce_large_string_types
+_make_bucket_udf = _SHUFFLE._make_bucket_udf
+_pick_bucket_col_name = _SHUFFLE._pick_bucket_col_name
+maybe_apply_repartition = _SHUFFLE.maybe_apply_repartition
+
class BucketUdfTest(unittest.TestCase):
"""The bucket-key UDF appends a deterministic int32 column."""
@@ -135,13 +148,13 @@ class CoerceLargeStringTypesTest(unittest.TestCase):
class BucketModeDispatchTest(unittest.TestCase):
- """``maybe_apply_repartition`` only clusters HASH_FIXED tables when
- the legacy ``map_groups`` mode is explicitly selected."""
+ """``maybe_apply_repartition`` clusters only supported HASH_FIXED
+ writes and rejects unsafe primary-key Ray writes."""
- def _make_table(self, bucket_mode):
+ def _make_table(self, bucket_mode, is_primary_key_table=False):
table = MagicMock()
table.bucket_mode.return_value = bucket_mode
- table.is_primary_key_table = False
+ table.is_primary_key_table = is_primary_key_table
return table
def test_bucket_unaware_returns_dataset_unchanged(self):
@@ -156,12 +169,47 @@ class BucketModeDispatchTest(unittest.TestCase):
self.assertIs(maybe_apply_repartition(dataset, table), dataset)
+ def test_hash_dynamic_primary_key_raises_value_error(self):
+ dataset = MagicMock(name="dataset")
+ table = self._make_table(
+ BucketMode.HASH_DYNAMIC, is_primary_key_table=True)
+
+ with self.assertRaisesRegex(ValueError, "HASH_DYNAMIC primary-key"):
+ maybe_apply_repartition(dataset, table)
+ dataset.map_batches.assert_not_called()
+
+ def test_hash_dynamic_primary_key_map_groups_raises_value_error(self):
+ dataset = MagicMock(name="dataset")
+ table = self._make_table(
+ BucketMode.HASH_DYNAMIC, is_primary_key_table=True)
+
+ with self.assertRaisesRegex(ValueError, "HASH_DYNAMIC primary-key"):
+ maybe_apply_repartition(dataset, table, "map_groups")
+ dataset.map_batches.assert_not_called()
+
def test_cross_partition_returns_dataset_unchanged(self):
dataset = object()
table = self._make_table(BucketMode.CROSS_PARTITION)
self.assertIs(maybe_apply_repartition(dataset, table), dataset)
+ def test_cross_partition_primary_key_raises_value_error(self):
+ dataset = MagicMock(name="dataset")
+ table = self._make_table(
+ BucketMode.CROSS_PARTITION, is_primary_key_table=True)
+
+ with self.assertRaisesRegex(ValueError, "CROSS_PARTITION primary-key"):
+ maybe_apply_repartition(dataset, table)
+ dataset.map_batches.assert_not_called()
+
+ def test_postpone_primary_key_returns_dataset_unchanged(self):
+ dataset = MagicMock(name="dataset")
+ table = self._make_table(
+ BucketMode.POSTPONE_MODE, is_primary_key_table=True)
+
+ self.assertIs(maybe_apply_repartition(dataset, table), dataset)
+ dataset.map_batches.assert_not_called()
+
def test_hash_fixed_default_returns_dataset_unchanged(self):
dataset = MagicMock(name="dataset")
table = MagicMock()
diff --git a/paimon-python/pypaimon/write/table_write.py
b/paimon-python/pypaimon/write/table_write.py
index 9336fff028..1eb63793d0 100644
--- a/paimon-python/pypaimon/write/table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -94,7 +94,7 @@ class TableWrite:
and ``"off"`` write append-only HASH_FIXED tables directly
and reject HASH_FIXED primary-key tables. ``"map_groups"``
preserves the legacy small-file optimization and its single
- group memory bound.
+ group memory bound for HASH_FIXED primary-key tables.
"""
from pypaimon.ray.shuffle import maybe_apply_repartition
from pypaimon.write.ray_datasink import PaimonDatasink