wuchong commented on code in PR #1784:
URL: https://github.com/apache/fluss/pull/1784#discussion_r2685419785


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.bucketing.BucketingFunction;
+import org.apache.fluss.client.table.getter.PartitionGetter;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.flink.row.RowWithOp;
+import org.apache.fluss.flink.sink.ChannelComputer;
+import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
+import org.apache.fluss.flink.sink.serializer.SerializerInitContextImpl;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.types.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * {@link ChannelComputer} for {@link StatisticsOrRecord} which will change 
shuffle based on
+ * partition statistic.
+ */
+@Internal
+public class StatisticsOrRecordChannelComputer<InputT>
+        implements ChannelComputer<StatisticsOrRecord<InputT>> {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(StatisticsOrRecordChannelComputer.class);
+
+    private final @Nullable DataLakeFormat lakeFormat;
+    private final RowType flussRowType;
+    private final List<String> bucketKeys;
+    private final List<String> partitionKeys;
+    private final FlussSerializationSchema<InputT> serializationSchema;
+    private final int bucketNum;
+
+    private transient int downstreamNumChannels;
+    private transient KeyEncoder bucketKeyEncoder;
+    private transient PartitionGetter partitionGetter;
+    private transient MapPartitioner delegatePartitioner;
+    private transient AtomicLong roundRobinCounter;
+    private transient BucketingFunction bucketingFunction;
+    private transient Random random;
+
+    public StatisticsOrRecordChannelComputer(
+            RowType flussRowType,
+            List<String> bucketKeys,
+            List<String> partitionKeys,
+            int bucketNum,
+            @Nullable DataLakeFormat lakeFormat,
+            FlussSerializationSchema<InputT> serializationSchema) {
+        checkArgument(
+                partitionKeys != null && !partitionKeys.isEmpty(),
+                "Partition keys cannot be empty.");
+        this.flussRowType = flussRowType;
+        this.bucketKeys = bucketKeys;
+        this.partitionKeys = partitionKeys;
+        this.bucketNum = bucketNum;
+        this.lakeFormat = lakeFormat;
+        this.serializationSchema = serializationSchema;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        LOG.info("Setting up with {} downstream channels", numChannels);
+        this.downstreamNumChannels = numChannels;
+        this.bucketingFunction = BucketingFunction.of(lakeFormat);
+        this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys, 
lakeFormat);
+        this.partitionGetter = new PartitionGetter(flussRowType, 
partitionKeys);
+        try {
+            this.serializationSchema.open(new 
SerializerInitContextImpl(flussRowType));
+        } catch (Exception e) {
+            throw new FlussRuntimeException(e);
+        }
+        this.random = ThreadLocalRandom.current();
+    }
+
+    @Override
+    public int channel(StatisticsOrRecord<InputT> wrapper) {
+        if (wrapper == null) {
+            throw new FlussRuntimeException("StatisticsOrRecord wrapper must 
not be null");
+        }
+
+        try {
+            if (wrapper.hasStatistics()) {
+                this.delegatePartitioner = 
delegatePartitioner(wrapper.statistics());
+                return (int)
+                        
(roundRobinCounter(downstreamNumChannels).getAndIncrement()
+                                % downstreamNumChannels);
+            } else {
+                if (delegatePartitioner == null) {
+                    delegatePartitioner = delegatePartitioner(null);
+                }
+
+                RowWithOp rowWithOp = 
serializationSchema.serialize(wrapper.record());
+                InternalRow row = rowWithOp.getRow();
+                String partitionName = partitionGetter.getPartition(row);
+                return delegatePartitioner.select(partitionName, row, 
downstreamNumChannels);
+            }
+        } catch (Exception e) {
+            throw new FlussRuntimeException(
+                    String.format(
+                            "Failed to serialize static or record of type '%s' 
in FlinkRowDataChannelComputer: %s",
+                            wrapper != null ? wrapper.getClass().getName() : 
"null",
+                            e.getMessage()),
+                    e);
+        }
+    }
+
+    private boolean hasBucketKey() {
+        return bucketKeys != null && !bucketKeys.isEmpty();
+    }
+
+    private AtomicLong roundRobinCounter(int numPartitions) {
+        if (roundRobinCounter == null) {
+            // randomize the starting point to avoid synchronization across 
subtasks
+            this.roundRobinCounter = new AtomicLong(new 
Random().nextInt(numPartitions));
+        }
+
+        return roundRobinCounter;
+    }
+
+    private MapPartitioner delegatePartitioner(@Nullable DataStatistics 
statistics) {
+        return statistics == null
+                ? new MapPartitioner()
+                : new MapPartitioner(assignment(downstreamNumChannels, 
statistics.result()));
+    }
+
+    Map<String, PartitionAssignment> assignment(
+            int downstreamParallelism, Map<String, Long> statistics) {
+        statistics.forEach(
+                (key, value) ->
+                        checkArgument(
+                                value > 0, "Invalid statistics: weight is 0 
for key %s", key));
+
+        long totalWeight = statistics.values().stream().mapToLong(l -> 
l).sum();
+        long targetWeightPerSubtask =
+                (long) Math.ceil(((double) totalWeight) / 
downstreamParallelism);
+
+        return buildAssignment(
+                downstreamParallelism,
+                statistics,
+                targetWeightPerSubtask,
+                hasBucketKey(),
+                bucketNum);
+    }
+
+    Map<String, PartitionAssignment> buildAssignment(
+            int downstreamParallelism,
+            Map<String, Long> dataStatistics,
+            long targetWeightPerSubtask,
+            boolean hasBucketKey,
+            int bucketNum) {
+        Map<String, PartitionAssignment> assignmentMap = new 
HashMap<>(dataStatistics.size());
+        Iterator<String> mapKeyIterator = dataStatistics.keySet().iterator();
+        int subtaskId = 0;
+        String currentKey = null;
+        long keyRemainingWeight = 0L;
+        long subtaskRemainingWeight = targetWeightPerSubtask;
+        // todo: 计算assigned 的 subtasks 列表,并计算每个subtask的 weight
+        List<Integer> assignedSubtasks = new ArrayList<>();
+        List<Long> subtaskWeights = new ArrayList<>();
+        while (mapKeyIterator.hasNext() || currentKey != null) {
+            // This should never happen because target weight is calculated 
using ceil function.
+            // todo: numPartitions是下游的所有id
+            if (subtaskId >= downstreamParallelism) {
+                LOG.error(
+                        "Internal algorithm error: exhausted subtasks with 
unassigned keys left. number of partitions: {}, "
+                                + "target weight per subtask: {}, data 
statistics: {}",
+                        downstreamParallelism,
+                        targetWeightPerSubtask,
+                        dataStatistics);
+                throw new IllegalStateException(
+                        "Internal algorithm error: exhausted subtasks with 
unassigned keys left");
+            }
+
+            if (currentKey == null) {
+                currentKey = mapKeyIterator.next();
+                keyRemainingWeight = dataStatistics.get(currentKey);
+            }
+
+            assignedSubtasks.add(subtaskId);
+            if (keyRemainingWeight < subtaskRemainingWeight) {
+                // assign the remaining weight of the key to the current 
subtask
+                subtaskWeights.add(keyRemainingWeight);
+                subtaskRemainingWeight -= keyRemainingWeight;
+                keyRemainingWeight = 0L;
+            } else {
+                // filled up the current subtask
+                long assignedWeight = subtaskRemainingWeight;
+                keyRemainingWeight -= subtaskRemainingWeight;
+
+                subtaskWeights.add(assignedWeight);
+                // move on to the next subtask
+                subtaskId += 1;
+                subtaskRemainingWeight = targetWeightPerSubtask;
+            }
+
+            checkState(
+                    assignedSubtasks.size() == subtaskWeights.size(),
+                    "List size mismatch: assigned subtasks = %s, subtask 
weights = %s",
+                    assignedSubtasks,
+                    subtaskWeights);
+
+            if (keyRemainingWeight == 0) {
+                // finishing up the assignment for the current key
+                PartitionAssignment keyAssignment =
+                        hasBucketKey
+                                ? new WeightedBucketIdAssignment(
+                                        assignedSubtasks,
+                                        subtaskWeights,
+                                        bucketNum,
+                                        bucketKeyEncoder,
+                                        bucketingFunction,
+                                        random)
+                                : new WeightedRandomAssignment(
+                                        assignedSubtasks, subtaskWeights, 
random);
+                assignmentMap.put(currentKey, keyAssignment);
+                assignedSubtasks = new ArrayList<>();
+                subtaskWeights = new ArrayList<>();
+                currentKey = null;
+            }
+        }
+
+        LOG.info("Assignment map: {}", assignmentMap);

Review Comment:
   Use `debug` log, otherwise, too many logs. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsOperator.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.client.table.getter.PartitionGetter;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.flink.row.RowWithOp;
+import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
+import org.apache.fluss.flink.sink.serializer.SerializerInitContextImpl;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.List;
+
+import static 
org.apache.fluss.flink.adapter.RuntimeContextAdapter.getIndexOfThisSubtask;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Data statistics operator which collects local data statistics and sends 
them to coordinator for
+ * global data statics, then sends global data statics to partitioner.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class DataStatisticsOperator<InputT>
+        extends AbstractStreamOperator<StatisticsOrRecord<InputT>>
+        implements OneInputStreamOperator<InputT, StatisticsOrRecord<InputT>>,
+                OperatorEventHandler {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String operatorName;
+    private final RowType rowType;
+    private final List<String> partitionKeys;
+    private final FlussSerializationSchema<InputT> flussSerializationSchema;
+    private final OperatorEventGateway operatorEventGateway;
+
+    private transient int subtaskIndex;
+    private transient volatile DataStatistics localStatistics;
+    private transient PartitionGetter partitionGetter;
+    private transient TypeSerializer<DataStatistics> statisticsSerializer;
+
+    DataStatisticsOperator(
+            StreamOperatorParameters<StatisticsOrRecord<InputT>> parameters,
+            String operatorName,
+            RowType rowType,
+            List<String> partitionKeys,
+            OperatorEventGateway operatorEventGateway,
+            FlussSerializationSchema<InputT> flussSerializationSchema) {
+        super();
+        checkArgument(
+                partitionKeys != null && !partitionKeys.isEmpty(),
+                "Partition keys cannot be empty.");
+        this.operatorName = operatorName;
+        this.operatorEventGateway = operatorEventGateway;
+        this.flussSerializationSchema = flussSerializationSchema;
+        this.rowType = rowType;
+        this.partitionKeys = partitionKeys;
+        this.setup(
+                parameters.getContainingTask(),
+                parameters.getStreamConfig(),
+                parameters.getOutput());
+    }
+
+    @Override
+    public void open() throws Exception {
+        this.partitionGetter = new PartitionGetter(rowType, partitionKeys);
+        this.statisticsSerializer = new DataStatisticsSerializer();
+        try {
+            this.flussSerializationSchema.open(new 
SerializerInitContextImpl(rowType));
+        } catch (Exception e) {
+            throw new FlussRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        this.subtaskIndex = getIndexOfThisSubtask(getRuntimeContext());
+        this.localStatistics = StatisticsUtil.createDataStatistics();
+    }
+
+    @Override
+    public void handleOperatorEvent(OperatorEvent event) {
+        checkArgument(
+                event instanceof StatisticsEvent,
+                String.format(
+                        "Operator %s subtask %s received unexpected operator 
event %s",
+                        operatorName, subtaskIndex, event.getClass()));
+        StatisticsEvent statisticsEvent = (StatisticsEvent) event;
+        LOG.info(
+                "Operator {} subtask {} received global data event from 
coordinator checkpoint {}",
+                operatorName,
+                subtaskIndex,
+                statisticsEvent.checkpointId());
+        DataStatistics globalStatistics =
+                StatisticsUtil.deserializeDataStatistics(
+                        statisticsEvent.statisticsBytes(), 
statisticsSerializer);
+        LOG.info("Global data statistics: {}", globalStatistics);

Review Comment:
   `globalStatistics` is very huge, should use `debug` log level here.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.shaded.guava32.com.google.common.collect.Iterables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Coordinator for collecting global data statistics.
+ *
+ * <p>NOTE: This class is inspired from Iceberg project.
+ */
+@Internal
+class DataStatisticsCoordinator implements OperatorCoordinator {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+    private final String operatorName;
+    private final OperatorCoordinator.Context context;
+    private final ExecutorService coordinatorExecutor;
+    private final SubtaskGateways subtaskGateways;
+    private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
+    private final TypeSerializer<DataStatistics> statisticsSerializer;
+
+    private transient boolean started;
+    private transient AggregatedStatisticsTracker aggregatedStatisticsTracker;
+
+    DataStatisticsCoordinator(String operatorName, OperatorCoordinator.Context 
context) {
+        this.operatorName = operatorName;
+        this.context = context;
+        this.coordinatorThreadFactory =
+                new CoordinatorExecutorThreadFactory(
+                        "DataStatisticsCoordinator-" + operatorName,
+                        context.getUserCodeClassloader());
+        this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+        this.subtaskGateways = new SubtaskGateways(operatorName, 
context.currentParallelism());
+        this.statisticsSerializer = new DataStatisticsSerializer();
+    }
+
+    @Override
+    public void start() throws Exception {
+        LOG.debug("Starting data statistics coordinator: {}.", operatorName);
+        this.started = true;
+
+        // statistics are restored already in resetToCheckpoint() before 
start() called
+        this.aggregatedStatisticsTracker =
+                new AggregatedStatisticsTracker(operatorName, 
context.currentParallelism());
+    }
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+        runInCoordinatorThread(
+                () -> {
+                    LOG.debug(
+                            "Handling event from subtask {} (#{}) of {}: {}",
+                            subtask,
+                            attemptNumber,
+                            operatorName,
+                            event);
+                    if (event instanceof StatisticsEvent) {
+                        handleDataStatisticRequest(subtask, ((StatisticsEvent) 
event));
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Invalid operator event type: "
+                                        + event.getClass().getCanonicalName());
+                    }
+                },
+                String.format(
+                        Locale.ROOT,
+                        "handling operator event %s from subtask %d (#%d)",
+                        event.getClass(),
+                        subtask,
+                        attemptNumber));
+    }
+
+    @Override
+    public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture)
+            throws Exception {
+        resultFuture.complete(new byte[0]);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {}
+
+    @Override
+    public void resetToCheckpoint(long checkpointId, byte[] checkpointData) {
+        checkState(
+                !started,
+                "The coordinator %s can only be reset if it was not yet 
started",
+                operatorName);
+    }
+
+    @Override
+    public void subtaskReset(int subtask, long checkpointId) {
+        runInCoordinatorThread(
+                () -> {
+                    LOG.info(
+                            "Operator {} subtask {} is reset to checkpoint {}",
+                            operatorName,
+                            subtask,
+                            checkpointId);
+                    
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+                    subtaskGateways.reset(subtask);
+                },
+                String.format(
+                        Locale.ROOT,
+                        "handling subtask %d recovery to checkpoint %d",
+                        subtask,
+                        checkpointId));
+    }
+
+    @Override
+    public void executionAttemptFailed(int subtask, int attemptNumber, 
@Nullable Throwable reason) {
+        runInCoordinatorThread(
+                () -> {
+                    LOG.info(
+                            "Unregistering gateway after failure for subtask 
{} (#{}) of data statistics {}",
+                            subtask,
+                            attemptNumber,
+                            operatorName);
+                    
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+                    subtaskGateways.unregisterSubtaskGateway(subtask, 
attemptNumber);
+                },
+                String.format(
+                        Locale.ROOT, "handling subtask %d (#%d) failure", 
subtask, attemptNumber));
+    }
+
+    @Override
+    public void executionAttemptReady(int subtask, int attemptNumber, 
SubtaskGateway gateway) {
+        checkArgument(subtask == gateway.getSubtask());
+        checkArgument(attemptNumber == 
gateway.getExecution().getAttemptNumber());
+        runInCoordinatorThread(
+                () -> {
+                    
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+                    subtaskGateways.registerSubtaskGateway(gateway);
+                },
+                String.format(
+                        Locale.ROOT,
+                        "making event gateway to subtask %d (#%d) available",
+                        subtask,
+                        attemptNumber));
+    }
+
+    @Override
+    public void close() throws Exception {
+        coordinatorExecutor.shutdown();
+        this.aggregatedStatisticsTracker = null;
+        this.started = false;
+        LOG.info("Closed data statistics coordinator: {}.", operatorName);
+    }
+
+    private void runInCoordinatorThread(Runnable runnable) {
+        this.coordinatorExecutor.execute(
+                new ThrowableCatchingRunnable(
+                        throwable ->
+                                
this.coordinatorThreadFactory.uncaughtException(
+                                        Thread.currentThread(), throwable),
+                        runnable));
+    }
+
+    private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+        ensureStarted();
+        runInCoordinatorThread(
+                () -> {
+                    try {
+                        action.run();
+                    } catch (Throwable t) {
+                        ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                        LOG.error(
+                                "Uncaught exception in the data statistics 
coordinator: {} while {}. Triggering job failover",
+                                operatorName,
+                                actionString,
+                                t);
+                        context.failJob(t);
+                    }
+                });
+    }
+
+    private void ensureStarted() {
+        checkState(started, "The coordinator of %s has not started yet.", 
operatorName);
+    }
+
+    private void handleDataStatisticRequest(int subtask, StatisticsEvent 
event) {
+        DataStatistics maybeCompletedStatistics =
+                aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, 
event);
+        if (maybeCompletedStatistics != null) {
+            if (maybeCompletedStatistics.isEmpty()) {
+                LOG.debug(
+                        "Skip aggregated statistics for checkpoint {} as it is 
empty.",
+                        event.checkpointId());
+            } else {
+                LOG.debug(
+                        "Completed statistics aggregation for checkpoint {}", 
event.checkpointId());
+                sendGlobalStatisticsToSubtasks(maybeCompletedStatistics, 
event.checkpointId());
+            }
+        }
+    }
+
+    private void sendGlobalStatisticsToSubtasks(DataStatistics statistics, 
long checkpointId) {
+        runInCoordinatorThread(
+                () -> {
+                    LOG.info(
+                            "Broadcast latest global statistics from 
checkpoint {} to all subtasks",
+                            checkpointId);
+                    // applyImmediately is set to false so that operator 
subtasks can
+                    // apply the change at checkpoint boundary
+                    StatisticsEvent statisticsEvent =
+                            StatisticsEvent.createStatisticsEvent(
+                                    checkpointId, statistics, 
statisticsSerializer);
+                    for (int i = 0; i < context.currentParallelism(); ++i) {
+                        // Ignore future return value for potential error 
(e.g. subtask down).
+                        // Upon restart, subtasks send request to coordinator 
to refresh statistics
+                        // if there is any difference
+                        
subtaskGateways.getSubtaskGateway(i).sendEvent(statisticsEvent);
+                    }
+                },
+                String.format(
+                        Locale.ROOT,
+                        "Failed to send operator %s coordinator global data 
statistics for checkpoint %d",
+                        operatorName,
+                        checkpointId));
+    }
+
+    @VisibleForTesting
+    void callInCoordinatorThread(Callable<Void> callable, String errorMessage) 
{
+        ensureStarted();
+        // Ensure the task is done by the coordinator executor.
+        if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+            try {
+                Callable<Void> guardedCallable =
+                        () -> {
+                            try {
+                                return callable.call();
+                            } catch (Throwable t) {
+                                LOG.error(
+                                        "Uncaught Exception in data statistics 
coordinator: {} executor",
+                                        operatorName,
+                                        t);
+                                ExceptionUtils.rethrowException(t);
+                                return null;
+                            }
+                        };
+
+                coordinatorExecutor.submit(guardedCallable).get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new FlinkRuntimeException(errorMessage, e);
+            }
+        } else {
+            try {
+                callable.call();
+            } catch (Throwable t) {
+                LOG.error(
+                        "Uncaught Exception in data statistics coordinator: {} 
executor",
+                        operatorName,
+                        t);
+                throw new FlinkRuntimeException(errorMessage, t);
+            }
+        }
+    }
+
+    @VisibleForTesting
+    public SubtaskGateways getSubtaskGateways() {
+        return subtaskGateways;
+    }
+
+    static class SubtaskGateways {
+        private final String operatorName;
+        private final Map<Integer, SubtaskGateway>[] gateways;
+
+        @SuppressWarnings("unchecked")
+        private SubtaskGateways(String operatorName, int parallelism) {
+            this.operatorName = operatorName;
+            gateways = new Map[parallelism];
+
+            for (int i = 0; i < parallelism; ++i) {
+                gateways[i] = new HashMap<>();
+            }
+        }
+
+        private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway 
gateway) {
+            int subtaskIndex = gateway.getSubtask();
+            int attemptNumber = gateway.getExecution().getAttemptNumber();
+            checkState(
+                    !gateways[subtaskIndex].containsKey(attemptNumber),
+                    "Coordinator of %s already has a subtask gateway for %d 
(#%d)",
+                    operatorName,
+                    subtaskIndex,
+                    attemptNumber);
+            LOG.debug(
+                    "Coordinator of {} registers gateway for subtask {} 
attempt {}",
+                    operatorName,
+                    subtaskIndex,
+                    attemptNumber);
+            gateways[subtaskIndex].put(attemptNumber, gateway);
+        }
+
+        private void unregisterSubtaskGateway(int subtaskIndex, int 
attemptNumber) {
+            LOG.debug(
+                    "Coordinator of {} unregisters gateway for subtask {} 
attempt {}",
+                    operatorName,
+                    subtaskIndex,
+                    attemptNumber);
+            gateways[subtaskIndex].remove(attemptNumber);
+        }
+
+        protected OperatorCoordinator.SubtaskGateway getSubtaskGateway(int 
subtaskIndex) {
+            checkState(
+                    !gateways[subtaskIndex].isEmpty(),
+                    "Coordinator of %s subtask %d is not ready yet to receive 
events",
+                    operatorName,
+                    subtaskIndex);
+            return Iterables.getOnlyElement(gateways[subtaskIndex].values());
+        }
+
+        private void reset(int subtaskIndex) {
+            gateways[subtaskIndex].clear();
+        }
+    }
+
+    private static class CoordinatorExecutorThreadFactory
+            implements ThreadFactory, Thread.UncaughtExceptionHandler {
+
+        private final String coordinatorThreadName;
+        private final ClassLoader classLoader;
+        private final Thread.UncaughtExceptionHandler errorHandler;
+
+        @javax.annotation.Nullable private Thread thread;

Review Comment:
   remove unnecessary qualification



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -123,20 +134,76 @@ public AppendSinkWriter<InputT> 
createWriter(MailboxExecutor mailboxExecutor) {
 
         @Override
         public DataStream<InputT> addPreWriteTopology(DataStream<InputT> 
input) {
-            // For append only sink, we will do bucket shuffle only if bucket 
keys are not empty.
-            if (!bucketKeys.isEmpty() && shuffleByBucketId) {
-                return partition(
-                        input,
-                        new FlinkRowDataChannelComputer<>(
-                                toFlussRowType(tableRowType),
-                                bucketKeys,
-                                partitionKeys,
-                                lakeFormat,
-                                numBucket,
-                                flussSerializationSchema),
-                        input.getParallelism());
-            } else {
-                return input;
+            switch (shuffleMode) {
+                case BUCKET_SHUFFLE:
+                    if (!bucketKeys.isEmpty()) {
+                        return partition(
+                                input,
+                                new FlinkRowDataChannelComputer<>(
+                                        toFlussRowType(tableRowType),
+                                        bucketKeys,
+                                        partitionKeys,
+                                        lakeFormat,
+                                        numBucket,
+                                        flussSerializationSchema),
+                                input.getParallelism());
+                    }
+                    return input;
+                case NONE:
+                    return input;
+                case DYNAMIC_SHUFFLE:
+                    if (partitionKeys.isEmpty()) {
+                        throw new UnsupportedOperationException(
+                                "DYNAMIC_SHUFFLE is only supported for 
partition tables");
+                    }
+
+                    if (rowTypeInformation == null) {
+                        throw new UnsupportedOperationException(
+                                "RowTypeInformation is required for 
DYNAMIC_SHUFFLE mode.");
+                    }
+                    TypeInformation<StatisticsOrRecord<InputT>> 
statisticsOrRecordTypeInformation =
+                            new 
StatisticsOrRecordTypeInformation<>(rowTypeInformation);
+                    SingleOutputStreamOperator<StatisticsOrRecord<InputT>> 
shuffleStream =
+                            input.transform(
+                                            "Dynamic shuffle data statistics",
+                                            statisticsOrRecordTypeInformation,
+                                            new 
DataStatisticsOperatorFactory<>(
+                                                    
toFlussRowType(tableRowType),
+                                                    partitionKeys,
+                                                    flussSerializationSchema))
+                                    .uid("Dynamic shuffle data statistics" + 
tablePath)

Review Comment:
   ```suggestion
                                       .uid("Dynamic shuffle data statistics " 
+ tablePath)
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecord.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Either a record or a statistics.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecord<InputT> {
+
+    private DataStatistics statistics;
+    private InputT record;
+
+    private StatisticsOrRecord(DataStatistics statistics, InputT record) {
+        checkArgument(
+                record != null ^ statistics != null,
+                "DataStatistics or record, not neither or both");
+        this.statistics = statistics;
+        this.record = record;
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> fromRecord(InputT 
record) {
+        return new StatisticsOrRecord<>(null, record);
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> 
fromStatistics(DataStatistics statistics) {
+        return new StatisticsOrRecord<>(statistics, null);
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> reuseRecord(
+            StatisticsOrRecord<InputT> reuse, TypeSerializer<InputT> 
recordSerializer) {
+        if (reuse.hasRecord()) {
+            return reuse;
+        } else {
+            // not reusable
+            return 
StatisticsOrRecord.fromRecord(recordSerializer.createInstance());
+        }
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> reuseStatistics(
+            StatisticsOrRecord<InputT> reuse, TypeSerializer<DataStatistics> 
statisticsSerializer) {
+        if (reuse.hasStatistics()) {
+            return reuse;
+        } else {
+            // not reusable
+            return 
StatisticsOrRecord.fromStatistics(statisticsSerializer.createInstance());
+        }
+    }
+
+    boolean hasStatistics() {
+        return statistics != null;
+    }
+
+    public boolean hasRecord() {

Review Comment:
   nit: rename to `isRecord`.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java:
##########
@@ -188,6 +194,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
         }
 
         FlinkSink<RowData> flinkSink = getFlinkSink(targetColumnIndexes);
+        if (distributionMode == DistributionMode.DYNAMIC_SHUFFLE) {
+            return new DataStreamSinkProvider() {
+                @Override
+                public DataStreamSink<?> consumeDataStream(
+                        ProviderContext providerContext, DataStream<RowData> 
dataStream) {
+                    String sinkName =
+                            "sink"
+                                    + "-"
+                                    + tablePath.getTableName()
+                                    + "-"
+                                    + tablePath.getDatabaseName();
+                    return 
dataStream.sinkTo(flinkSink).uid(sinkName).name(sinkName);

Review Comment:
   Do we really need to assign uid for the sink operator? We don't have state 
in sink for now.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -123,20 +134,76 @@ public AppendSinkWriter<InputT> 
createWriter(MailboxExecutor mailboxExecutor) {
 
         @Override
         public DataStream<InputT> addPreWriteTopology(DataStream<InputT> 
input) {
-            // For append only sink, we will do bucket shuffle only if bucket 
keys are not empty.
-            if (!bucketKeys.isEmpty() && shuffleByBucketId) {
-                return partition(
-                        input,
-                        new FlinkRowDataChannelComputer<>(
-                                toFlussRowType(tableRowType),
-                                bucketKeys,
-                                partitionKeys,
-                                lakeFormat,
-                                numBucket,
-                                flussSerializationSchema),
-                        input.getParallelism());
-            } else {
-                return input;
+            switch (shuffleMode) {
+                case BUCKET_SHUFFLE:
+                    if (!bucketKeys.isEmpty()) {
+                        return partition(
+                                input,
+                                new FlinkRowDataChannelComputer<>(
+                                        toFlussRowType(tableRowType),
+                                        bucketKeys,
+                                        partitionKeys,
+                                        lakeFormat,
+                                        numBucket,
+                                        flussSerializationSchema),
+                                input.getParallelism());
+                    }
+                    return input;
+                case NONE:
+                    return input;
+                case DYNAMIC_SHUFFLE:
+                    if (partitionKeys.isEmpty()) {
+                        throw new UnsupportedOperationException(
+                                "DYNAMIC_SHUFFLE is only supported for 
partition tables");
+                    }
+
+                    if (rowTypeInformation == null) {
+                        throw new UnsupportedOperationException(
+                                "RowTypeInformation is required for 
DYNAMIC_SHUFFLE mode.");
+                    }
+                    TypeInformation<StatisticsOrRecord<InputT>> 
statisticsOrRecordTypeInformation =
+                            new 
StatisticsOrRecordTypeInformation<>(rowTypeInformation);

Review Comment:
   Use `new StatisticsOrRecordTypeInformation<>(input.getType());` so we don't 
need to introduce `rowTypeInformation`? And this will be safer. 



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java:
##########
@@ -100,7 +102,10 @@ void before() {
         // open a catalog so that we can get table from the catalog
         String bootstrapServers = 
FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
         // create table environment
-        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        org.apache.flink.configuration.Configuration config =
+                new org.apache.flink.configuration.Configuration();
+        config.setString("taskmanager.numberOfTaskSlots", "3");

Review Comment:
   Is this change necessary? Because the default slots is 4 defined in 
`AbstractTestBase`.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatistics.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/** Data statistics for a partition name and its frequency. */
+@Internal
+public class DataStatistics {
+
+    private final Map<String, Long> partitionFrequency;
+
+    public DataStatistics() {
+        this.partitionFrequency = new HashMap<>();
+    }
+
+    DataStatistics(Map<String, Long> partitionFrequency) {
+        this.partitionFrequency = partitionFrequency;
+    }
+
+    public boolean isEmpty() {
+        return partitionFrequency.isEmpty();
+    }
+
+    public void add(String partition, long value) {
+        if (partitionFrequency.containsKey(partition)) {
+            partitionFrequency.merge(partition, value, Long::sum);
+        } else {
+            // clone the sort key before adding to map because input sortKey 
object can be reused
+            partitionFrequency.put(partition, value);

Review Comment:
   Can just `partitionFrequency.merge(partition, value, Long::sum);` for 
simplification and better performance.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java:
##########
@@ -159,6 +168,66 @@ public RowWithOp serialize(RowData value) throws Exception 
{
         return new RowWithOp(row, opType);
     }
 
+    @Override
+    public long size(RowData value, RowType rowType) {
+        if (value instanceof BinaryFormat) {
+            return ((BinaryFormat) value).getSizeInBytes();
+        }
+
+        long size = 0;
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            DataField field = rowType.getFields().get(i);
+            DataTypeRoot typeRoot = field.getType().getTypeRoot();
+            if (value.isNullAt(i)) {
+                continue;
+            }
+            switch (typeRoot) {
+                case CHAR:
+                    size += ((CharType) (field.getType())).getLength();
+                    break;
+                case STRING:
+                    StringData stringData = value.getString(i);
+                    if (stringData instanceof BinaryStringData) {
+                        size += ((BinaryStringData) 
stringData).getSizeInBytes();
+                    } else {
+                        size += converter.getString(i).getSizeInBytes();
+                    }
+                    break;
+                case BINARY:
+                    size += ((BinaryType) (field.getType())).getLength();
+                    break;
+                case BYTES:
+                    size += converter.getBytes(i).length;

Review Comment:
   ditto



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java:
##########
@@ -159,6 +168,66 @@ public RowWithOp serialize(RowData value) throws Exception 
{
         return new RowWithOp(row, opType);
     }
 
+    @Override
+    public long size(RowData value, RowType rowType) {
+        if (value instanceof BinaryFormat) {
+            return ((BinaryFormat) value).getSizeInBytes();
+        }
+
+        long size = 0;
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            DataField field = rowType.getFields().get(i);
+            DataTypeRoot typeRoot = field.getType().getTypeRoot();
+            if (value.isNullAt(i)) {
+                continue;
+            }
+            switch (typeRoot) {
+                case CHAR:
+                    size += ((CharType) (field.getType())).getLength();
+                    break;
+                case STRING:
+                    StringData stringData = value.getString(i);
+                    if (stringData instanceof BinaryStringData) {
+                        size += ((BinaryStringData) 
stringData).getSizeInBytes();
+                    } else {
+                        size += converter.getString(i).getSizeInBytes();

Review Comment:
   Better to use `size += stringData.toBytes().length`. It's confusing what's 
the `converter` here, and what's the underlying row of `converter`.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/WeightedRandomAssignment.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.row.InternalRow;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Partition assignment strategy that randomly distributes records to subtasks 
based on configured
+ * weights.
+ *
+ * <p>This assignment strategy enables weighted random distribution of records 
to subtasks, allowing
+ * for more balanced load distribution across downstream subtasks. The 
assignment uses a weighted
+ * random algorithm where subtasks with higher weights have a proportionally 
higher probability of
+ * being selected.
+ *
+ * <p>NOTE: This class is inspired from Iceberg project.
+ */
+@Internal
+public class WeightedRandomAssignment implements PartitionAssignment {
+    protected final List<Integer> assignedSubtasks;
+    protected final List<Long> subtaskWeights;
+    protected final long keyWeight;
+    protected final double[] cumulativeWeights;
+    private final Random random;
+
+    /**
+     * @param assignedSubtasks assigned subtasks for this key. It could be a 
single subtask. It
+     *     could also be multiple subtasks if the key has heavy weight that 
should be handled by
+     *     multiple subtasks.
+     * @param subtaskWeights assigned weight for each subtask. E.g., if the 
keyWeight is 27 and the
+     *     key is assigned to 3 subtasks, subtaskWeights could contain values 
as [10, 10, 7] for
+     *     target weight of 10 per subtask.
+     */
+    WeightedRandomAssignment(
+            List<Integer> assignedSubtasks, List<Long> subtaskWeights, Random 
random) {
+        checkArgument(
+                assignedSubtasks != null && !assignedSubtasks.isEmpty(),
+                "Invalid assigned subtasks: null or empty");
+        checkArgument(
+                subtaskWeights != null && !subtaskWeights.isEmpty(),
+                "Invalid assigned subtasks weights: null or empty");
+        checkArgument(
+                assignedSubtasks.size() == subtaskWeights.size(),
+                "Invalid assignment: size mismatch (tasks length = %s, weights 
length = %s)",
+                assignedSubtasks.size(),
+                subtaskWeights.size());
+
+        this.assignedSubtasks = assignedSubtasks;
+        this.subtaskWeights = subtaskWeights;
+        this.keyWeight = 
subtaskWeights.stream().mapToLong(Long::longValue).sum();
+        this.cumulativeWeights = new double[subtaskWeights.size()];
+        long cumulativeWeight = 0;
+        for (int i = 0; i < subtaskWeights.size(); ++i) {
+            cumulativeWeight += subtaskWeights.get(i);
+            cumulativeWeights[i] = cumulativeWeight;
+        }
+        this.random = random;
+    }
+
+    /**
+     * Select a subtask for the key. If bucket key is existed , same key will 
be assigned to the
+     * same subtask.
+     *
+     * @return subtask id
+     */
+    @Override
+    public int select(InternalRow row) {
+        if (assignedSubtasks.size() == 1) {
+            // only choice. no need to run random number generator.
+            return assignedSubtasks.get(0);
+        } else {
+            double randomNumber = nextDouble(0, keyWeight);
+            int index = Arrays.binarySearch(cumulativeWeights, randomNumber);
+            // choose the subtask where randomNumber < cumulativeWeights[pos].
+            // this works regardless whether index is negative or not.
+            int position = Math.abs(index + 1);
+            checkState(
+                    position < assignedSubtasks.size(),
+                    "Invalid selected position: out of range. key weight = %s, 
random number = %s, cumulative weights array = %s",
+                    keyWeight,
+                    randomNumber,
+                    Arrays.toString(cumulativeWeights));
+            return assignedSubtasks.get(position);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        WeightedRandomAssignment that = (WeightedRandomAssignment) o;
+        return keyWeight == that.keyWeight
+                && Objects.equals(assignedSubtasks, that.assignedSubtasks)
+                && Objects.equals(subtaskWeights, that.subtaskWeights)
+                && Objects.deepEquals(cumulativeWeights, 
that.cumulativeWeights);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                assignedSubtasks, subtaskWeights, keyWeight, 
Arrays.hashCode(cumulativeWeights));
+    }
+
+    @Override
+    public String toString() {
+        return "KeyAssignment{"
+                + "assignedSubtasks="
+                + assignedSubtasks
+                + ", subtaskWeights="
+                + subtaskWeights
+                + ", keyWeight="
+                + keyWeight
+                + ", cumulativeWeights="
+                + Arrays.toString(cumulativeWeights)
+                + '}';
+    }
+
+    protected double nextDouble(double origin, double bound) {

Review Comment:
   Add comment to explain why we don't directly use 
`ThreadLocalThread.nextDouble(...)` here.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.shaded.guava32.com.google.common.collect.Iterables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Coordinator for collecting global data statistics.
+ *
+ * <p>NOTE: This class is inspired from Iceberg project.
+ */
+@Internal
+class DataStatisticsCoordinator implements OperatorCoordinator {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+    private final String operatorName;
+    private final OperatorCoordinator.Context context;
+    private final ExecutorService coordinatorExecutor;
+    private final SubtaskGateways subtaskGateways;
+    private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
+    private final TypeSerializer<DataStatistics> statisticsSerializer;
+
+    private transient boolean started;
+    private transient AggregatedStatisticsTracker aggregatedStatisticsTracker;
+
+    DataStatisticsCoordinator(String operatorName, OperatorCoordinator.Context 
context) {
+        this.operatorName = operatorName;
+        this.context = context;
+        this.coordinatorThreadFactory =
+                new CoordinatorExecutorThreadFactory(
+                        "DataStatisticsCoordinator-" + operatorName,
+                        context.getUserCodeClassloader());
+        this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+        this.subtaskGateways = new SubtaskGateways(operatorName, 
context.currentParallelism());
+        this.statisticsSerializer = new DataStatisticsSerializer();
+    }
+
+    @Override
+    public void start() throws Exception {
+        LOG.debug("Starting data statistics coordinator: {}.", operatorName);
+        this.started = true;
+
+        // statistics are restored already in resetToCheckpoint() before 
start() called
+        this.aggregatedStatisticsTracker =
+                new AggregatedStatisticsTracker(operatorName, 
context.currentParallelism());
+    }
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+        runInCoordinatorThread(
+                () -> {
+                    LOG.debug(
+                            "Handling event from subtask {} (#{}) of {}: {}",
+                            subtask,
+                            attemptNumber,
+                            operatorName,
+                            event);
+                    if (event instanceof StatisticsEvent) {
+                        handleDataStatisticRequest(subtask, ((StatisticsEvent) 
event));
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Invalid operator event type: "
+                                        + event.getClass().getCanonicalName());
+                    }
+                },
+                String.format(
+                        Locale.ROOT,
+                        "handling operator event %s from subtask %d (#%d)",
+                        event.getClass(),
+                        subtask,
+                        attemptNumber));
+    }
+
+    @Override
+    public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture)
+            throws Exception {
+        resultFuture.complete(new byte[0]);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {}
+
+    @Override
+    public void resetToCheckpoint(long checkpointId, byte[] checkpointData) {
+        checkState(
+                !started,
+                "The coordinator %s can only be reset if it was not yet 
started",
+                operatorName);
+    }
+
+    @Override
+    public void subtaskReset(int subtask, long checkpointId) {
+        runInCoordinatorThread(
+                () -> {
+                    LOG.info(
+                            "Operator {} subtask {} is reset to checkpoint {}",
+                            operatorName,
+                            subtask,
+                            checkpointId);
+                    
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+                    subtaskGateways.reset(subtask);
+                },
+                String.format(
+                        Locale.ROOT,
+                        "handling subtask %d recovery to checkpoint %d",
+                        subtask,
+                        checkpointId));
+    }
+
+    @Override
+    public void executionAttemptFailed(int subtask, int attemptNumber, 
@Nullable Throwable reason) {
+        runInCoordinatorThread(
+                () -> {
+                    LOG.info(
+                            "Unregistering gateway after failure for subtask 
{} (#{}) of data statistics {}",
+                            subtask,
+                            attemptNumber,
+                            operatorName);
+                    
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+                    subtaskGateways.unregisterSubtaskGateway(subtask, 
attemptNumber);
+                },
+                String.format(
+                        Locale.ROOT, "handling subtask %d (#%d) failure", 
subtask, attemptNumber));
+    }
+
+    @Override
+    public void executionAttemptReady(int subtask, int attemptNumber, 
SubtaskGateway gateway) {
+        checkArgument(subtask == gateway.getSubtask());
+        checkArgument(attemptNumber == 
gateway.getExecution().getAttemptNumber());
+        runInCoordinatorThread(
+                () -> {
+                    
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+                    subtaskGateways.registerSubtaskGateway(gateway);
+                },
+                String.format(
+                        Locale.ROOT,
+                        "making event gateway to subtask %d (#%d) available",
+                        subtask,
+                        attemptNumber));
+    }
+
+    @Override
+    public void close() throws Exception {
+        coordinatorExecutor.shutdown();
+        this.aggregatedStatisticsTracker = null;
+        this.started = false;
+        LOG.info("Closed data statistics coordinator: {}.", operatorName);
+    }
+
+    private void runInCoordinatorThread(Runnable runnable) {
+        this.coordinatorExecutor.execute(
+                new ThrowableCatchingRunnable(
+                        throwable ->
+                                
this.coordinatorThreadFactory.uncaughtException(
+                                        Thread.currentThread(), throwable),
+                        runnable));
+    }
+
+    private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, 
String actionString) {
+        ensureStarted();
+        runInCoordinatorThread(
+                () -> {
+                    try {
+                        action.run();
+                    } catch (Throwable t) {
+                        ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                        LOG.error(
+                                "Uncaught exception in the data statistics 
coordinator: {} while {}. Triggering job failover",
+                                operatorName,
+                                actionString,
+                                t);
+                        context.failJob(t);
+                    }
+                });
+    }
+
+    private void ensureStarted() {
+        checkState(started, "The coordinator of %s has not started yet.", 
operatorName);
+    }
+
+    private void handleDataStatisticRequest(int subtask, StatisticsEvent 
event) {
+        DataStatistics maybeCompletedStatistics =
+                aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, 
event);
+        if (maybeCompletedStatistics != null) {
+            if (maybeCompletedStatistics.isEmpty()) {
+                LOG.debug(
+                        "Skip aggregated statistics for checkpoint {} as it is 
empty.",
+                        event.checkpointId());
+            } else {
+                LOG.debug(
+                        "Completed statistics aggregation for checkpoint {}", 
event.checkpointId());
+                sendGlobalStatisticsToSubtasks(maybeCompletedStatistics, 
event.checkpointId());
+            }
+        }
+    }
+
+    private void sendGlobalStatisticsToSubtasks(DataStatistics statistics, 
long checkpointId) {
+        runInCoordinatorThread(
+                () -> {
+                    LOG.info(
+                            "Broadcast latest global statistics from 
checkpoint {} to all subtasks",
+                            checkpointId);
+                    // applyImmediately is set to false so that operator 
subtasks can
+                    // apply the change at checkpoint boundary
+                    StatisticsEvent statisticsEvent =
+                            StatisticsEvent.createStatisticsEvent(
+                                    checkpointId, statistics, 
statisticsSerializer);
+                    for (int i = 0; i < context.currentParallelism(); ++i) {
+                        // Ignore future return value for potential error 
(e.g. subtask down).
+                        // Upon restart, subtasks send request to coordinator 
to refresh statistics
+                        // if there is any difference
+                        
subtaskGateways.getSubtaskGateway(i).sendEvent(statisticsEvent);
+                    }
+                },
+                String.format(
+                        Locale.ROOT,
+                        "Failed to send operator %s coordinator global data 
statistics for checkpoint %d",
+                        operatorName,
+                        checkpointId));
+    }
+
+    @VisibleForTesting
+    void callInCoordinatorThread(Callable<Void> callable, String errorMessage) 
{
+        ensureStarted();
+        // Ensure the task is done by the coordinator executor.
+        if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+            try {
+                Callable<Void> guardedCallable =
+                        () -> {
+                            try {
+                                return callable.call();
+                            } catch (Throwable t) {
+                                LOG.error(
+                                        "Uncaught Exception in data statistics 
coordinator: {} executor",
+                                        operatorName,
+                                        t);
+                                ExceptionUtils.rethrowException(t);
+                                return null;
+                            }
+                        };
+
+                coordinatorExecutor.submit(guardedCallable).get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new FlinkRuntimeException(errorMessage, e);
+            }
+        } else {
+            try {
+                callable.call();
+            } catch (Throwable t) {
+                LOG.error(
+                        "Uncaught Exception in data statistics coordinator: {} 
executor",
+                        operatorName,
+                        t);
+                throw new FlinkRuntimeException(errorMessage, t);
+            }
+        }
+    }
+
+    @VisibleForTesting
+    public SubtaskGateways getSubtaskGateways() {
+        return subtaskGateways;
+    }
+
+    static class SubtaskGateways {
+        private final String operatorName;
+        private final Map<Integer, SubtaskGateway>[] gateways;
+
+        @SuppressWarnings("unchecked")
+        private SubtaskGateways(String operatorName, int parallelism) {
+            this.operatorName = operatorName;
+            gateways = new Map[parallelism];
+
+            for (int i = 0; i < parallelism; ++i) {
+                gateways[i] = new HashMap<>();
+            }
+        }
+
+        private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway 
gateway) {
+            int subtaskIndex = gateway.getSubtask();
+            int attemptNumber = gateway.getExecution().getAttemptNumber();
+            checkState(
+                    !gateways[subtaskIndex].containsKey(attemptNumber),
+                    "Coordinator of %s already has a subtask gateway for %d 
(#%d)",
+                    operatorName,
+                    subtaskIndex,
+                    attemptNumber);
+            LOG.debug(
+                    "Coordinator of {} registers gateway for subtask {} 
attempt {}",
+                    operatorName,
+                    subtaskIndex,
+                    attemptNumber);
+            gateways[subtaskIndex].put(attemptNumber, gateway);
+        }
+
+        private void unregisterSubtaskGateway(int subtaskIndex, int 
attemptNumber) {
+            LOG.debug(
+                    "Coordinator of {} unregisters gateway for subtask {} 
attempt {}",
+                    operatorName,
+                    subtaskIndex,
+                    attemptNumber);
+            gateways[subtaskIndex].remove(attemptNumber);
+        }
+
+        protected OperatorCoordinator.SubtaskGateway getSubtaskGateway(int 
subtaskIndex) {
+            checkState(
+                    !gateways[subtaskIndex].isEmpty(),
+                    "Coordinator of %s subtask %d is not ready yet to receive 
events",
+                    operatorName,
+                    subtaskIndex);
+            return Iterables.getOnlyElement(gateways[subtaskIndex].values());
+        }
+
+        private void reset(int subtaskIndex) {
+            gateways[subtaskIndex].clear();
+        }
+    }
+
+    private static class CoordinatorExecutorThreadFactory
+            implements ThreadFactory, Thread.UncaughtExceptionHandler {
+
+        private final String coordinatorThreadName;
+        private final ClassLoader classLoader;
+        private final Thread.UncaughtExceptionHandler errorHandler;
+
+        @javax.annotation.Nullable private Thread thread;
+
+        CoordinatorExecutorThreadFactory(
+                final String coordinatorThreadName, final ClassLoader 
contextClassLoader) {
+            this(coordinatorThreadName, contextClassLoader, 
FatalExitExceptionHandler.INSTANCE);
+        }
+
+        @org.apache.flink.annotation.VisibleForTesting

Review Comment:
   Use Fluss `@VisibleForTesting`.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.bucketing.BucketingFunction;
+import org.apache.fluss.client.table.getter.PartitionGetter;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.flink.row.RowWithOp;
+import org.apache.fluss.flink.sink.ChannelComputer;
+import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
+import org.apache.fluss.flink.sink.serializer.SerializerInitContextImpl;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.types.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * {@link ChannelComputer} for {@link StatisticsOrRecord} which will change 
shuffle based on
+ * partition statistic.
+ */
+@Internal
+public class StatisticsOrRecordChannelComputer<InputT>
+        implements ChannelComputer<StatisticsOrRecord<InputT>> {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(StatisticsOrRecordChannelComputer.class);
+
+    private final @Nullable DataLakeFormat lakeFormat;
+    private final RowType flussRowType;
+    private final List<String> bucketKeys;
+    private final List<String> partitionKeys;
+    private final FlussSerializationSchema<InputT> serializationSchema;
+    private final int bucketNum;
+
+    private transient int downstreamNumChannels;
+    private transient KeyEncoder bucketKeyEncoder;
+    private transient PartitionGetter partitionGetter;
+    private transient MapPartitioner delegatePartitioner;
+    private transient AtomicLong roundRobinCounter;
+    private transient BucketingFunction bucketingFunction;
+    private transient Random random;
+
+    public StatisticsOrRecordChannelComputer(
+            RowType flussRowType,
+            List<String> bucketKeys,
+            List<String> partitionKeys,
+            int bucketNum,
+            @Nullable DataLakeFormat lakeFormat,
+            FlussSerializationSchema<InputT> serializationSchema) {
+        checkArgument(
+                partitionKeys != null && !partitionKeys.isEmpty(),
+                "Partition keys cannot be empty.");
+        this.flussRowType = flussRowType;
+        this.bucketKeys = bucketKeys;
+        this.partitionKeys = partitionKeys;
+        this.bucketNum = bucketNum;
+        this.lakeFormat = lakeFormat;
+        this.serializationSchema = serializationSchema;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        LOG.info("Setting up with {} downstream channels", numChannels);
+        this.downstreamNumChannels = numChannels;
+        this.bucketingFunction = BucketingFunction.of(lakeFormat);
+        this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys, 
lakeFormat);
+        this.partitionGetter = new PartitionGetter(flussRowType, 
partitionKeys);
+        try {
+            this.serializationSchema.open(new 
SerializerInitContextImpl(flussRowType));
+        } catch (Exception e) {
+            throw new FlussRuntimeException(e);
+        }
+        this.random = ThreadLocalRandom.current();
+    }
+
+    @Override
+    public int channel(StatisticsOrRecord<InputT> wrapper) {
+        if (wrapper == null) {
+            throw new FlussRuntimeException("StatisticsOrRecord wrapper must 
not be null");
+        }
+
+        try {
+            if (wrapper.hasStatistics()) {
+                this.delegatePartitioner = 
delegatePartitioner(wrapper.statistics());
+                return (int)
+                        
(roundRobinCounter(downstreamNumChannels).getAndIncrement()
+                                % downstreamNumChannels);
+            } else {
+                if (delegatePartitioner == null) {
+                    delegatePartitioner = delegatePartitioner(null);
+                }
+
+                RowWithOp rowWithOp = 
serializationSchema.serialize(wrapper.record());
+                InternalRow row = rowWithOp.getRow();
+                String partitionName = partitionGetter.getPartition(row);
+                return delegatePartitioner.select(partitionName, row, 
downstreamNumChannels);
+            }
+        } catch (Exception e) {
+            throw new FlussRuntimeException(
+                    String.format(
+                            "Failed to serialize static or record of type '%s' 
in FlinkRowDataChannelComputer: %s",
+                            wrapper != null ? wrapper.getClass().getName() : 
"null",

Review Comment:
   wrapper is always not null here



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecord.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Either a record or a statistics.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecord<InputT> {
+
+    private DataStatistics statistics;
+    private InputT record;
+
+    private StatisticsOrRecord(DataStatistics statistics, InputT record) {
+        checkArgument(
+                record != null ^ statistics != null,
+                "DataStatistics or record, not neither or both");
+        this.statistics = statistics;
+        this.record = record;
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> fromRecord(InputT 
record) {
+        return new StatisticsOrRecord<>(null, record);
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> 
fromStatistics(DataStatistics statistics) {
+        return new StatisticsOrRecord<>(statistics, null);
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> reuseRecord(
+            StatisticsOrRecord<InputT> reuse, TypeSerializer<InputT> 
recordSerializer) {
+        if (reuse.hasRecord()) {
+            return reuse;
+        } else {
+            // not reusable
+            return 
StatisticsOrRecord.fromRecord(recordSerializer.createInstance());
+        }
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> reuseStatistics(
+            StatisticsOrRecord<InputT> reuse, TypeSerializer<DataStatistics> 
statisticsSerializer) {
+        if (reuse.hasStatistics()) {
+            return reuse;
+        } else {
+            // not reusable
+            return 
StatisticsOrRecord.fromStatistics(statisticsSerializer.createInstance());
+        }
+    }
+
+    boolean hasStatistics() {

Review Comment:
   nit: rename to `isStatistics()`



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecord.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Either a record or a statistics.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecord<InputT> {
+
+    private DataStatistics statistics;
+    private InputT record;
+
+    private StatisticsOrRecord(DataStatistics statistics, InputT record) {
+        checkArgument(
+                record != null ^ statistics != null,
+                "DataStatistics or record, not neither or both");
+        this.statistics = statistics;
+        this.record = record;
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> fromRecord(InputT 
record) {
+        return new StatisticsOrRecord<>(null, record);
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> 
fromStatistics(DataStatistics statistics) {
+        return new StatisticsOrRecord<>(statistics, null);
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> reuseRecord(
+            StatisticsOrRecord<InputT> reuse, TypeSerializer<InputT> 
recordSerializer) {
+        if (reuse.hasRecord()) {
+            return reuse;
+        } else {
+            // not reusable
+            return 
StatisticsOrRecord.fromRecord(recordSerializer.createInstance());
+        }
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> reuseStatistics(
+            StatisticsOrRecord<InputT> reuse, TypeSerializer<DataStatistics> 
statisticsSerializer) {
+        if (reuse.hasStatistics()) {
+            return reuse;
+        } else {
+            // not reusable
+            return 
StatisticsOrRecord.fromStatistics(statisticsSerializer.createInstance());
+        }
+    }
+
+    boolean hasStatistics() {
+        return statistics != null;
+    }
+
+    public boolean hasRecord() {
+        return record != null;
+    }
+
+    public DataStatistics statistics() {
+        return statistics;
+    }
+
+    public void statistics(DataStatistics newStatistics) {

Review Comment:
   rename to `setStatistics`



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -123,20 +134,76 @@ public AppendSinkWriter<InputT> 
createWriter(MailboxExecutor mailboxExecutor) {
 
         @Override
         public DataStream<InputT> addPreWriteTopology(DataStream<InputT> 
input) {
-            // For append only sink, we will do bucket shuffle only if bucket 
keys are not empty.
-            if (!bucketKeys.isEmpty() && shuffleByBucketId) {
-                return partition(
-                        input,
-                        new FlinkRowDataChannelComputer<>(
-                                toFlussRowType(tableRowType),
-                                bucketKeys,
-                                partitionKeys,
-                                lakeFormat,
-                                numBucket,
-                                flussSerializationSchema),
-                        input.getParallelism());
-            } else {
-                return input;
+            switch (shuffleMode) {
+                case BUCKET_SHUFFLE:
+                    if (!bucketKeys.isEmpty()) {
+                        return partition(
+                                input,
+                                new FlinkRowDataChannelComputer<>(
+                                        toFlussRowType(tableRowType),
+                                        bucketKeys,
+                                        partitionKeys,
+                                        lakeFormat,
+                                        numBucket,
+                                        flussSerializationSchema),
+                                input.getParallelism());
+                    }
+                    return input;
+                case NONE:

Review Comment:
   Move `NONE` to first



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -123,20 +134,76 @@ public AppendSinkWriter<InputT> 
createWriter(MailboxExecutor mailboxExecutor) {
 
         @Override
         public DataStream<InputT> addPreWriteTopology(DataStream<InputT> 
input) {
-            // For append only sink, we will do bucket shuffle only if bucket 
keys are not empty.
-            if (!bucketKeys.isEmpty() && shuffleByBucketId) {
-                return partition(
-                        input,
-                        new FlinkRowDataChannelComputer<>(
-                                toFlussRowType(tableRowType),
-                                bucketKeys,
-                                partitionKeys,
-                                lakeFormat,
-                                numBucket,
-                                flussSerializationSchema),
-                        input.getParallelism());
-            } else {
-                return input;
+            switch (shuffleMode) {
+                case BUCKET_SHUFFLE:
+                    if (!bucketKeys.isEmpty()) {
+                        return partition(
+                                input,
+                                new FlinkRowDataChannelComputer<>(
+                                        toFlussRowType(tableRowType),
+                                        bucketKeys,
+                                        partitionKeys,
+                                        lakeFormat,
+                                        numBucket,
+                                        flussSerializationSchema),
+                                input.getParallelism());
+                    }
+                    return input;
+                case NONE:
+                    return input;
+                case DYNAMIC_SHUFFLE:
+                    if (partitionKeys.isEmpty()) {
+                        throw new UnsupportedOperationException(
+                                "DYNAMIC_SHUFFLE is only supported for 
partition tables");
+                    }
+
+                    if (rowTypeInformation == null) {
+                        throw new UnsupportedOperationException(
+                                "RowTypeInformation is required for 
DYNAMIC_SHUFFLE mode.");
+                    }
+                    TypeInformation<StatisticsOrRecord<InputT>> 
statisticsOrRecordTypeInformation =
+                            new 
StatisticsOrRecordTypeInformation<>(rowTypeInformation);
+                    SingleOutputStreamOperator<StatisticsOrRecord<InputT>> 
shuffleStream =
+                            input.transform(
+                                            "Dynamic shuffle data statistics",
+                                            statisticsOrRecordTypeInformation,
+                                            new 
DataStatisticsOperatorFactory<>(
+                                                    
toFlussRowType(tableRowType),
+                                                    partitionKeys,
+                                                    flussSerializationSchema))
+                                    .uid("Dynamic shuffle data statistics" + 
tablePath)
+                                    // Set the parallelism same as input 
operator to encourage
+                                    // chaining
+                                    .setParallelism(input.getParallelism());
+
+                    return partition(
+                                    shuffleStream,
+                                    new StatisticsOrRecordChannelComputer<>(
+                                            toFlussRowType(tableRowType),
+                                            bucketKeys,
+                                            partitionKeys,
+                                            numBucket,
+                                            lakeFormat,
+                                            flussSerializationSchema),
+                                    input.getParallelism())
+                            .flatMap(
+                                    
(FlatMapFunction<StatisticsOrRecord<InputT>, InputT>)
+                                            (statisticsOrRecord, out) -> {
+                                                if 
(statisticsOrRecord.hasRecord()) {
+                                                    
out.collect(statisticsOrRecord.record());
+                                                }
+                                            })
+                            .uid("flat map" + tablePath)

Review Comment:
   ```suggestion
                               .uid("flat map " + tablePath)
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/WeightedRandomAssignment.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.row.InternalRow;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Partition assignment strategy that randomly distributes records to subtasks 
based on configured
+ * weights.
+ *
+ * <p>This assignment strategy enables weighted random distribution of records 
to subtasks, allowing
+ * for more balanced load distribution across downstream subtasks. The 
assignment uses a weighted
+ * random algorithm where subtasks with higher weights have a proportionally 
higher probability of
+ * being selected.
+ *
+ * <p>NOTE: This class is inspired from Iceberg project.
+ */
+@Internal
+public class WeightedRandomAssignment implements PartitionAssignment {
+    protected final List<Integer> assignedSubtasks;
+    protected final List<Long> subtaskWeights;
+    protected final long keyWeight;
+    protected final double[] cumulativeWeights;
+    private final Random random;
+
+    /**
+     * @param assignedSubtasks assigned subtasks for this key. It could be a 
single subtask. It
+     *     could also be multiple subtasks if the key has heavy weight that 
should be handled by
+     *     multiple subtasks.
+     * @param subtaskWeights assigned weight for each subtask. E.g., if the 
keyWeight is 27 and the
+     *     key is assigned to 3 subtasks, subtaskWeights could contain values 
as [10, 10, 7] for
+     *     target weight of 10 per subtask.
+     */
+    WeightedRandomAssignment(
+            List<Integer> assignedSubtasks, List<Long> subtaskWeights, Random 
random) {
+        checkArgument(
+                assignedSubtasks != null && !assignedSubtasks.isEmpty(),
+                "Invalid assigned subtasks: null or empty");
+        checkArgument(
+                subtaskWeights != null && !subtaskWeights.isEmpty(),
+                "Invalid assigned subtasks weights: null or empty");
+        checkArgument(
+                assignedSubtasks.size() == subtaskWeights.size(),
+                "Invalid assignment: size mismatch (tasks length = %s, weights 
length = %s)",
+                assignedSubtasks.size(),
+                subtaskWeights.size());
+
+        this.assignedSubtasks = assignedSubtasks;
+        this.subtaskWeights = subtaskWeights;
+        this.keyWeight = 
subtaskWeights.stream().mapToLong(Long::longValue).sum();
+        this.cumulativeWeights = new double[subtaskWeights.size()];
+        long cumulativeWeight = 0;
+        for (int i = 0; i < subtaskWeights.size(); ++i) {
+            cumulativeWeight += subtaskWeights.get(i);
+            cumulativeWeights[i] = cumulativeWeight;
+        }
+        this.random = random;
+    }
+
+    /**
+     * Select a subtask for the key. If bucket key is existed , same key will 
be assigned to the
+     * same subtask.
+     *
+     * @return subtask id
+     */
+    @Override
+    public int select(InternalRow row) {
+        if (assignedSubtasks.size() == 1) {
+            // only choice. no need to run random number generator.
+            return assignedSubtasks.get(0);
+        } else {
+            double randomNumber = nextDouble(0, keyWeight);
+            int index = Arrays.binarySearch(cumulativeWeights, randomNumber);
+            // choose the subtask where randomNumber < cumulativeWeights[pos].
+            // this works regardless whether index is negative or not.
+            int position = Math.abs(index + 1);
+            checkState(
+                    position < assignedSubtasks.size(),
+                    "Invalid selected position: out of range. key weight = %s, 
random number = %s, cumulative weights array = %s",
+                    keyWeight,
+                    randomNumber,
+                    Arrays.toString(cumulativeWeights));

Review Comment:
   Use `if` condition, otherwise, the `Arrays.toString(cumulativeWeights)` is 
always executed. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecord.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Either a record or a statistics.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecord<InputT> {
+
+    private DataStatistics statistics;
+    private InputT record;
+
+    private StatisticsOrRecord(DataStatistics statistics, InputT record) {
+        checkArgument(
+                record != null ^ statistics != null,
+                "DataStatistics or record, not neither or both");
+        this.statistics = statistics;
+        this.record = record;
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> fromRecord(InputT 
record) {
+        return new StatisticsOrRecord<>(null, record);
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> 
fromStatistics(DataStatistics statistics) {
+        return new StatisticsOrRecord<>(statistics, null);
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> reuseRecord(
+            StatisticsOrRecord<InputT> reuse, TypeSerializer<InputT> 
recordSerializer) {
+        if (reuse.hasRecord()) {
+            return reuse;
+        } else {
+            // not reusable
+            return 
StatisticsOrRecord.fromRecord(recordSerializer.createInstance());
+        }
+    }
+
+    public static <InputT> StatisticsOrRecord<InputT> reuseStatistics(
+            StatisticsOrRecord<InputT> reuse, TypeSerializer<DataStatistics> 
statisticsSerializer) {
+        if (reuse.hasStatistics()) {
+            return reuse;
+        } else {
+            // not reusable
+            return 
StatisticsOrRecord.fromStatistics(statisticsSerializer.createInstance());
+        }
+    }
+
+    boolean hasStatistics() {
+        return statistics != null;
+    }
+
+    public boolean hasRecord() {
+        return record != null;
+    }
+
+    public DataStatistics statistics() {
+        return statistics;
+    }
+
+    public void statistics(DataStatistics newStatistics) {
+        this.statistics = newStatistics;
+    }
+
+    public InputT record() {
+        return record;
+    }
+
+    public void record(InputT newRecord) {

Review Comment:
   rename to `setRecord`.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinatorTest.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.flink.sink.serializer.RowDataSerializationSchema;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.fluss.flink.sink.shuffle.StatisticsEvent.createStatisticsEvent;
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DataStatisticsCoordinator}. */
+public class DataStatisticsCoordinatorTest {
+    private static final DataStatisticsSerializer dataStatisticsSerializer =
+            new DataStatisticsSerializer();
+
+    @Test
+    void testStatisticsEvent() throws Exception {
+        try (DataStatisticsCoordinator coordinator = 
getDataStatisticsCoordinator()) {
+            coordinator.start();
+            DataStatisticsCoordinator.SubtaskGateways subtaskGateways =
+                    coordinator.getSubtaskGateways();
+            coordinator.executionAttemptReady(0, 1, new MockGateway(0, 1));
+            coordinator.executionAttemptReady(1, 1, new MockGateway(1, 1));
+            coordinator.handleEventFromOperator(
+                    0,
+                    1,
+                    createStatisticsEvent(
+                            0,
+                            new 
DataStatistics(Collections.singletonMap("partition 1", 10000L)),
+                            dataStatisticsSerializer));
+            coordinator.handleEventFromOperator(
+                    1,
+                    1,
+                    createStatisticsEvent(
+                            1,
+                            new 
DataStatistics(Collections.singletonMap("partition 1", 20000L)),
+                            dataStatisticsSerializer));
+            coordinator.callInCoordinatorThread(
+                    () -> null, "wait until all pending tasks in coordinator 
are finished ");
+            assertThat(((MockGateway) 
subtaskGateways.getSubtaskGateway(0)).events).isEmpty();
+            assertThat(((MockGateway) 
subtaskGateways.getSubtaskGateway(1)).events).isEmpty();
+
+            Map<String, Long> partitionFrequencies = new HashMap<>();
+            partitionFrequencies.put("partition 1", 10000L);
+            partitionFrequencies.put("partition 2", 20000L);
+            coordinator.handleEventFromOperator(
+                    0,
+                    1,
+                    createStatisticsEvent(
+                            1, new DataStatistics(partitionFrequencies), 
dataStatisticsSerializer));
+
+            Thread.sleep(1000);

Review Comment:
   Do we need to sleep here? It seems the following 
`coordinator.callInCoordinatorThread` has already waited for the pending events 
been processed. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java:
##########
@@ -125,6 +126,23 @@ public class FlinkConnectorOptions {
                                     + BUCKET_KEY.key()
                                     + "' is defined. For Primary Key table, it 
is enabled by default.");
 
+    public static final ConfigOption<DistributionMode> SINK_DISTRIBUTION_MODE =
+            ConfigOptions.key("sink.distribution-mode")
+                    .enumType(DistributionMode.class)
+                    .defaultValue(DistributionMode.BUCKET_SHUFFLE)
+                    .withDescription(
+                            "Defines the distribution mode for writing data to 
the sink. Available options are: \n"
+                                    + "- NONE: No specific distribution 
strategy. Data is forwarded as is.\n"
+                                    + "- BUCKET_SHUFFLE: Shuffle data by 
bucket ID before writing to sink. "
+                                    + "Shuffling the data with the same bucket 
ID to be processed by the same task "
+                                    + "can improve the efficiency of client 
processing and reduce resource consumption. "
+                                    + "For Log Table, bucket shuffle will only 
take effect when the '"
+                                    + BUCKET_KEY.key()
+                                    + "' is defined. For Primary Key table, it 
is enabled by default.\n"
+                                    + "- DYNAMIC_SHUFFLE: Dynamically adjust 
shuffle strategy based on partition key traffic patterns. "
+                                    + "This mode monitors data distribution 
and adjusts the shuffle behavior to balance the load. "

Review Comment:
   Mention when DYNAMIC should be used, and what is the cost. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -204,8 +272,13 @@ public DataStream<InputT> 
addPreWriteTopology(DataStream<InputT> input) {
                                     lakeFormat,
                                     numBucket,
                                     flussSerializationSchema),
-                            input.getParallelism())
-                    : input;
+                            input.getParallelism());
+                case NONE:

Review Comment:
   move `NONE` to first



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java:
##########
@@ -125,6 +126,23 @@ public class FlinkConnectorOptions {
                                     + BUCKET_KEY.key()
                                     + "' is defined. For Primary Key table, it 
is enabled by default.");
 
+    public static final ConfigOption<DistributionMode> SINK_DISTRIBUTION_MODE =
+            ConfigOptions.key("sink.distribution-mode")
+                    .enumType(DistributionMode.class)
+                    .defaultValue(DistributionMode.BUCKET_SHUFFLE)
+                    .withDescription(
+                            "Defines the distribution mode for writing data to 
the sink. Available options are: \n"
+                                    + "- NONE: No specific distribution 
strategy. Data is forwarded as is.\n"
+                                    + "- BUCKET_SHUFFLE: Shuffle data by 
bucket ID before writing to sink. "
+                                    + "Shuffling the data with the same bucket 
ID to be processed by the same task "
+                                    + "can improve the efficiency of client 
processing and reduce resource consumption. "

Review Comment:
   Mention this can improve throughput for primary key tables.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java:
##########
@@ -79,7 +85,7 @@ public class FlinkTableSink
     private final DeleteBehavior tableDeleteBehavior;
     private final int numBucket;
     private final List<String> bucketKeys;
-    private final boolean shuffleByBucketId;
+    private DistributionMode distributionMode;

Review Comment:
   `final`



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinatorTest.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.flink.sink.serializer.RowDataSerializationSchema;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.fluss.flink.sink.shuffle.StatisticsEvent.createStatisticsEvent;
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DataStatisticsCoordinator}. */
+public class DataStatisticsCoordinatorTest {
+    private static final DataStatisticsSerializer dataStatisticsSerializer =
+            new DataStatisticsSerializer();
+
+    @Test
+    void testStatisticsEvent() throws Exception {
+        try (DataStatisticsCoordinator coordinator = 
getDataStatisticsCoordinator()) {
+            coordinator.start();
+            DataStatisticsCoordinator.SubtaskGateways subtaskGateways =
+                    coordinator.getSubtaskGateways();
+            coordinator.executionAttemptReady(0, 1, new MockGateway(0, 1));
+            coordinator.executionAttemptReady(1, 1, new MockGateway(1, 1));
+            coordinator.handleEventFromOperator(
+                    0,
+                    1,
+                    createStatisticsEvent(
+                            0,
+                            new 
DataStatistics(Collections.singletonMap("partition 1", 10000L)),
+                            dataStatisticsSerializer));
+            coordinator.handleEventFromOperator(
+                    1,
+                    1,
+                    createStatisticsEvent(
+                            1,
+                            new 
DataStatistics(Collections.singletonMap("partition 1", 20000L)),
+                            dataStatisticsSerializer));
+            coordinator.callInCoordinatorThread(
+                    () -> null, "wait until all pending tasks in coordinator 
are finished ");
+            assertThat(((MockGateway) 
subtaskGateways.getSubtaskGateway(0)).events).isEmpty();
+            assertThat(((MockGateway) 
subtaskGateways.getSubtaskGateway(1)).events).isEmpty();
+
+            Map<String, Long> partitionFrequencies = new HashMap<>();
+            partitionFrequencies.put("partition 1", 10000L);
+            partitionFrequencies.put("partition 2", 20000L);
+            coordinator.handleEventFromOperator(
+                    0,
+                    1,
+                    createStatisticsEvent(
+                            1, new DataStatistics(partitionFrequencies), 
dataStatisticsSerializer));
+
+            Thread.sleep(1000);
+            Map<String, Long> expectedPartitionFrequencies = new HashMap<>();
+            expectedPartitionFrequencies.put("partition 1", 30000L);
+            expectedPartitionFrequencies.put("partition 2", 20000L);
+            StatisticsEvent expectedStatisticsEvent =
+                    createStatisticsEvent(
+                            1,
+                            new DataStatistics(expectedPartitionFrequencies),
+                            dataStatisticsSerializer);
+
+            coordinator.callInCoordinatorThread(

Review Comment:
   This is a little hack that the introduced `callInCoordinatorThread` is just 
for testing, and the semantic is different from `runInCoordinatorThread` 
(`callInCoordinatorThread` will block until the callable is finished), but by 
name, they look similar. 
   
   I suggest not introduce `callInCoordinatorThread`, and just use 
`runInCoordinatorThread` to execute a `CompletableFuture.complte(null)`, and 
wait the `CompletableFuture.get()` in test. 



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java:
##########
@@ -351,6 +363,59 @@ void testAppendLogWithMultiBatch() throws Exception {
         assertResultsIgnoreOrder(rowIter, expectedRows, true);
     }
 
+    @ParameterizedTest
+    @EnumSource(value = DistributionMode.class)
+    void testAppendLogPartitionTable(DistributionMode distributionMode) throws 
Exception {
+        tEnv.executeSql(
+                String.format(
+                        "create table sink_test (a int not null, b bigint, c 
string) "
+                                + " partitioned by (c) "
+                                + "with ('bucket.num' = '3', 
'sink.distribution-mode'= '%s')",
+                        distributionMode));
+        String insertSql =
+                "INSERT INTO sink_test(a, b, c) "
+                        + "VALUES (1, 3501, 'Tim'), "
+                        + "(2, 3502, 'Fabian'), "
+                        + "(3, 3503, 'Tim'), "
+                        + "(4, 3504, 'jerry'), "
+                        + "(5, 3505, 'piggy'), "
+                        + "(7, 3507, 'Fabian'), "
+                        + "(8, 3508, 'stave'), "
+                        + "(9, 3509, 'Tim'), "
+                        + "(10, 3510, 'coco'), "
+                        + "(11, 3511, 'stave'), "
+                        + "(12, 3512, 'Tim')";
+        String insertPlan = tEnv.explainSql(insertSql, 
ExplainDetail.JSON_EXECUTION_PLAN);
+        if (distributionMode == DistributionMode.DYNAMIC_SHUFFLE) {
+            assertThat(insertPlan)
+                    .contains(String.format("\"ship_strategy\" : \"%s\"", 
distributionMode.name()));
+        } else {
+            assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\"");
+        }
+        tEnv.executeSql(insertSql).await();
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
sink_test").collect();
+        //noinspection ArraysAsListWithZeroOrOneArgument
+        List<List<String>> expectedGroups =
+                Arrays.asList(
+                        Arrays.asList(
+                                "+I[1, 3501, Tim]",
+                                "+I[3, 3503, Tim]",
+                                "+I[9, 3509, Tim]",
+                                "+I[12, 3512, Tim]"),
+                        Arrays.asList("+I[2, 3502, Fabian]", "+I[7, 3507, 
Fabian]"),
+                        Arrays.asList("+I[4, 3504, jerry]"),
+                        Arrays.asList("+I[5, 3505, piggy]"),
+                        Arrays.asList("+I[8, 3508, stave]", "+I[11, 3511, 
stave]"),
+                        Arrays.asList("+I[10, 3510, coco]"));
+
+        List<String> expectedRows =
+                
expectedGroups.stream().flatMap(List::stream).collect(Collectors.toList());
+
+        List<String> actual = collectRowsWithTimeout(rowIter, 
expectedRows.size());
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRows);
+    }
+

Review Comment:
   Please update the `testPut(boolean sinkBucketShuffle)` to use distributed 
mode. 



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java:
##########
@@ -351,6 +363,59 @@ void testAppendLogWithMultiBatch() throws Exception {
         assertResultsIgnoreOrder(rowIter, expectedRows, true);
     }
 
+    @ParameterizedTest
+    @EnumSource(value = DistributionMode.class)
+    void testAppendLogPartitionTable(DistributionMode distributionMode) throws 
Exception {
+        tEnv.executeSql(
+                String.format(
+                        "create table sink_test (a int not null, b bigint, c 
string) "
+                                + " partitioned by (c) "
+                                + "with ('bucket.num' = '3', 
'sink.distribution-mode'= '%s')",
+                        distributionMode));
+        String insertSql =
+                "INSERT INTO sink_test(a, b, c) "
+                        + "VALUES (1, 3501, 'Tim'), "
+                        + "(2, 3502, 'Fabian'), "
+                        + "(3, 3503, 'Tim'), "
+                        + "(4, 3504, 'jerry'), "
+                        + "(5, 3505, 'piggy'), "
+                        + "(7, 3507, 'Fabian'), "
+                        + "(8, 3508, 'stave'), "
+                        + "(9, 3509, 'Tim'), "
+                        + "(10, 3510, 'coco'), "
+                        + "(11, 3511, 'stave'), "
+                        + "(12, 3512, 'Tim')";
+        String insertPlan = tEnv.explainSql(insertSql, 
ExplainDetail.JSON_EXECUTION_PLAN);
+        if (distributionMode == DistributionMode.DYNAMIC_SHUFFLE) {
+            assertThat(insertPlan)
+                    .contains(String.format("\"ship_strategy\" : \"%s\"", 
distributionMode.name()));
+        } else {

Review Comment:
   We should throw exception when it's BUCKET shuffle, as the table doesn't 
have bucket key. (after introduce AUTO default mode)



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import java.util.Locale;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/** Distribution mode for sink shuffling. */
+public enum DistributionMode {
+    NONE("NONE"),
+    BUCKET_SHUFFLE("BUCKET_SHUFFLE"),
+    DYNAMIC_SHUFFLE("DYNAMIC_SHUFFLE");
+
+    private final String modeName;
+
+    DistributionMode(String modeName) {
+        this.modeName = modeName;
+    }
+
+    public String modeName() {

Review Comment:
   we don't need the `modeName`, use `name()` directly.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java:
##########
@@ -125,6 +126,23 @@ public class FlinkConnectorOptions {
                                     + BUCKET_KEY.key()
                                     + "' is defined. For Primary Key table, it 
is enabled by default.");

Review Comment:
   Add deprecation annotation and comments to the config option



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java:
##########
@@ -125,6 +126,23 @@ public class FlinkConnectorOptions {
                                     + BUCKET_KEY.key()
                                     + "' is defined. For Primary Key table, it 
is enabled by default.");
 
+    public static final ConfigOption<DistributionMode> SINK_DISTRIBUTION_MODE =
+            ConfigOptions.key("sink.distribution-mode")
+                    .enumType(DistributionMode.class)
+                    .defaultValue(DistributionMode.BUCKET_SHUFFLE)

Review Comment:
   `sink.distribution-mode`
   
   - Default: AUTO
   
   - AUTO: Automatically chooses the best mode based on the table type. Uses 
BUCKET mode for Primary Key Tables and NONE for Log Tables to maximize 
throughput.
   
   - NONE: Direct write without reshuffling.
   
   - BUCKET: ..
   
   - PARTITION_DYNAMIC: ...
   
   



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java:
##########
@@ -51,6 +51,10 @@ public interface FlussSerializationSchema<T> extends 
Serializable {
      */
     RowWithOp serialize(T value) throws Exception;
 
+    default long size(T value, RowType rowType) {

Review Comment:
   This method is a little hack to me, and it's very confusing what's the 
relationship and order between this method and the `serialize` method. 
   
   I suggest to add a `getEstimatedSizeInBytes` method to `RowWithOp` class, 
and add an additional construct that accepts `estimatedSizeInBytes` parameter. 
So we can combine the size estimation into the `serialize` method.
   
   ```
   @Nuulable
   public Long getEstimatedSizeInBytes() {
       ...
   }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import java.util.Locale;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/** Distribution mode for sink shuffling. */
+public enum DistributionMode {
+    NONE("NONE"),
+    BUCKET_SHUFFLE("BUCKET_SHUFFLE"),
+    DYNAMIC_SHUFFLE("DYNAMIC_SHUFFLE");
+
+    private final String modeName;
+
+    DistributionMode(String modeName) {
+        this.modeName = modeName;
+    }
+
+    public String modeName() {
+        return modeName;
+    }
+
+    public static DistributionMode fromName(String modeName) {
+        checkArgument(null != modeName, "Invalid distribution mode: null");
+        try {
+            return 
DistributionMode.valueOf(modeName.toUpperCase(Locale.ENGLISH));

Review Comment:
   nit: use `.toUpperCase()` is enough? I mean all the other places just use 
`.toUpperCase()`.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java:
##########
@@ -159,6 +168,66 @@ public RowWithOp serialize(RowData value) throws Exception 
{
         return new RowWithOp(row, opType);
     }
 
+    @Override
+    public long size(RowData value, RowType rowType) {
+        if (value instanceof BinaryFormat) {
+            return ((BinaryFormat) value).getSizeInBytes();
+        }
+
+        long size = 0;
+        for (int i = 0; i < rowType.getFieldCount(); i++) {

Review Comment:
   We should pre-compute fixed-length part size for better performance, and 
only need to compute var-length parts during runtime. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to