TheR1sing3un commented on code in PR #7813:
URL: https://github.com/apache/paimon/pull/7813#discussion_r3225944786


##########
paimon-python/pypaimon/ray/shuffle.py:
##########
@@ -0,0 +1,157 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Pre-repartition a Ray Dataset by (partition, bucket) before writing
+to a Paimon table.
+
+Without this, Ray's default round-robin block distribution scatters rows
+that share the same (partition, bucket) across many Ray tasks. Each
+task then opens its own writer and emits its own data file, producing
+``partitions × buckets × ray_tasks`` files instead of the
+``partitions × buckets`` the writer would naturally produce.
+
+When ``shuffle=True`` and the table is HASH_FIXED, we group rows by
+``(partition_keys..., bucket)`` so every distinct group lands in a
+single Ray task. ``bucket`` is computed using the same
+``FixedBucketRowKeyExtractor`` the writer uses, so the shuffle-time
+bucket assignment is byte-equivalent to the writer's.
+
+For any other bucket mode the shuffle is a soft no-op with a warning;
+we never raise. ``shuffle=False`` is the default and preserves the
+original Ray round-robin behaviour.
+"""
+
+import logging
+from typing import TYPE_CHECKING, List, Optional, Tuple
+
+import pyarrow as pa
+
+from pypaimon.table.bucket_mode import BucketMode
+
+if TYPE_CHECKING:
+    import ray.data
+
+    from pypaimon.table.table import Table
+
+logger = logging.getLogger(__name__)
+
+# Transient column the helper appends before the groupby and strips
+# afterwards. Sink-side schema is identical to caller-provided schema.
+BUCKET_KEY_COL = "__paimon_bucket__"

Review Comment:
   > `__paimon_bucket__` can be a valid user/table column name. With a fixed 
transient name, `batch.append_column()` creates a duplicate Arrow field when 
the input already has this column; in PyArrow this produces duplicate names and 
`drop_columns([BUCKET_KEY_COL])` then raises `KeyError` (or, depending on the 
dataset backend/version, could remove the user's real column). As a result, 
`write_paimon(..., shuffle=True)` breaks for an otherwise valid table schema. 
Could we generate a temporary bucket column name that is guaranteed not to 
collide with the table/dataset columns (or fail early with a clear error), and 
add a regression test for a table containing `__paimon_bucket__`?
   
   nice catch! thanks! fixed it already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to