Copilot commented on code in PR #1784:
URL: https://github.com/apache/fluss/pull/1784#discussion_r2648274494


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java:
##########
@@ -159,6 +168,66 @@ public RowWithOp serialize(RowData value) throws Exception 
{
         return new RowWithOp(row, opType);
     }
 
+    @Override
+    public long size(RowData value, RowType rowType) {
+        if (value instanceof BinaryFormat) {
+            return ((BinaryFormat) value).getSizeInBytes();
+        }
+
+        long size = 0;
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            DataField field = rowType.getFields().get(i);
+            DataTypeRoot typeRoot = field.getType().getTypeRoot();
+            if (value.isNullAt(i)) {
+                continue;
+            }
+            switch (typeRoot) {
+                case CHAR:
+                    size += ((CharType) (field.getType())).getLength();
+                    break;
+                case STRING:
+                    StringData stringData = value.getString(i);
+                    if (stringData instanceof BinaryStringData) {
+                        size += ((BinaryStringData) 
stringData).getSizeInBytes();
+                    } else {
+                        size += converter.getString(i).getSizeInBytes();
+                    }
+                    break;
+                case BINARY:
+                    size += ((BinaryType) (field.getType())).getLength();
+                    break;
+                case BYTES:
+                    size += converter.getBytes(i).length;
+                    break;
+                case DECIMAL:
+                    size += ((DecimalType) (field.getType())).getPrecision();
+                    break;
+                case BOOLEAN:
+                case TINYINT:
+                    size += 1;
+                    break;
+                case SMALLINT:
+                    size += 2;
+                    break;
+                case INTEGER:
+                case DATE:
+                case TIME_WITHOUT_TIME_ZONE:
+                    size += 4;
+                    break;
+                case BIGINT:
+                case FLOAT:
+                case DOUBLE:
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                    size += 8;

Review Comment:
   The size calculation may be incorrect for FLOAT type. Line 218-222 assigns 8 
bytes to FLOAT, but FLOAT typically only requires 4 bytes. DOUBLE requires 8 
bytes. This could lead to incorrect statistics and weight calculations in the 
dynamic shuffle algorithm.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java:
##########
@@ -188,6 +194,23 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
         }
 
         FlinkSink<RowData> flinkSink = getFlinkSink(targetColumnIndexes);
+        if (distributionMode == DistributionMode.DYNAMIC_SHUFFLE) {
+            return new DataStreamSinkProvider() {
+                @Override
+                public DataStreamSink<?> consumeDataStream(
+                        ProviderContext providerContext, DataStream<RowData> 
dataStream) {
+                    // todo: add prefix to the sink to avoid conflict in same 
flink job.
+                    String defaultSuffix =
+                            "sink"

Review Comment:
   TODO comment should be resolved or converted to a proper issue/ticket. The 
comment indicates the need for a proper prefix to avoid conflicts, which is an 
important consideration for production code.
   ```suggestion
                       // Use a connector-specific prefix to reduce the risk of 
sink name/UID conflicts.
                       String defaultSuffix =
                               "fluss-sink"
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatistics.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/** Data statistics for a partition name and its frequency. */
+@Internal
+public class DataStatistics {

Review Comment:
   Class DataStatistics overrides [hashCode](1) but not equals.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java:
##########
@@ -92,9 +95,19 @@ public FlussSinkBuilder<InputT> setTable(String table) {
         return this;
     }
 
-    /** Set shuffle by bucket id. */
+    /**
+     * Set shuffle by bucket id. Deprecated use {@link
+     * FlussSinkBuilder#setShuffleMode(DistributionMode) } instead.
+     */
+    @Deprecated
     public FlussSinkBuilder<InputT> setShuffleByBucketId(boolean 
shuffleByBucketId) {
-        this.shuffleByBucketId = shuffleByBucketId;
+        this.shuffleMode =
+                shuffleByBucketId ? DistributionMode.BUCKET_SHUFFLE : 
DistributionMode.NONE;
+        return this;
+    }
+
+    public FlussSinkBuilder<InputT> setShuffleMode(DistributionMode 
shuffleByBucketId) {
+        this.shuffleMode = shuffleByBucketId;

Review Comment:
   The parameter name 'shuffleByBucketId' is misleading. The parameter should 
be named 'distributionMode' or 'shuffleMode' to accurately reflect that it 
accepts a DistributionMode enum value, not a boolean.
   ```suggestion
       public FlussSinkBuilder<InputT> setShuffleMode(DistributionMode 
shuffleMode) {
           this.shuffleMode = shuffleMode;
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/DataStatisticOperatorTest.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.flink.sink.serializer.RowDataSerializationSchema;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.fluss.flink.sink.shuffle.StatisticsEvent.createStatisticsEvent;
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DataStatisticsOperator}. */
+public class DataStatisticOperatorTest {
+
+    @Test
+    void testProcessElement() throws Exception {
+        DataStatisticsOperatorFactory<RowData> factory =
+                new DataStatisticsOperatorFactory<>(
+                        DATA1_ROW_TYPE,
+                        Collections.singletonList("b"),
+                        new RowDataSerializationSchema(false, false));
+        List<StreamRecord<RowData>> inputRecords =
+                Arrays.asList(
+                        new StreamRecord<>(GenericRowData.of(1, 
StringData.fromString("a"))),
+                        new StreamRecord<>(GenericRowData.of(2, 
StringData.fromString("a"))),
+                        new StreamRecord<>(GenericRowData.of(3, 
StringData.fromString("b"))));
+
+        List<StreamRecord<StatisticsOrRecord<RowData>>> expectedOutput = new 
ArrayList<>();
+
+        try (DataStatisticOperatorTestHarness testHarness =
+                new DataStatisticOperatorTestHarness(factory, 1, 1, 0)) {
+            testHarness.open();
+            assertThat(testHarness.getLocalStatistics()).isEmpty();
+
+            //  process five records

Review Comment:
   The test comment says "process five records" (line 64) but only three 
records are being processed in the loop. The comment should be corrected to say 
"process three records".
   ```suggestion
               //  process three records
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/WeightedRandomAssignment.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.row.InternalRow;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Partition assignment strategy that randomly distributes records to subtasks 
based on configured
+ * weights.
+ *
+ * <p>This assignment strategy enables weighted random distribution of records 
to subtasks, allowing
+ * for more balanced load distribution across downstream subtasks. The 
assignment uses a weighted
+ * random algorithm where subtasks with higher weights have a proportionally 
higher probability of
+ * being selected.
+ *
+ * <p>NOTE: This class is inspired from Iceberg project.
+ */
+@Internal
+public class WeightedRandomAssignment implements PartitionAssignment {

Review Comment:
   Class WeightedRandomAssignment overrides [hashCode](1) but not equals.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+/**
+ * The type information for StatisticsOrRecord.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecordTypeInformation<InputT>
+        extends TypeInformation<StatisticsOrRecord<InputT>> {
+
+    private final TypeInformation<InputT> rowTypeInformation;
+    private final DataStatisticsSerializer globalStatisticsSerializer;
+
+    public StatisticsOrRecordTypeInformation(TypeInformation<InputT> 
rowTypeInformation) {
+        this.rowTypeInformation = rowTypeInformation;
+        this.globalStatisticsSerializer = new DataStatisticsSerializer();
+    }
+
+    @Override
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public boolean isTupleType() {
+        return false;
+    }
+
+    @Override
+    public int getArity() {
+        return 1;
+    }
+
+    @Override
+    public int getTotalFields() {
+        return 1;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Class<StatisticsOrRecord<InputT>> getTypeClass() {
+        return (Class<StatisticsOrRecord<InputT>>) (Class<?>) 
StatisticsOrRecord.class;
+    }
+
+    @Override
+    public boolean isKeyType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<StatisticsOrRecord<InputT>> 
createSerializer(SerializerConfig config) {
+        TypeSerializer<InputT> recordSerializer = 
rowTypeInformation.createSerializer(config);
+        return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer, 
recordSerializer);
+    }
+
+    @Override
+    @Deprecated
+    public TypeSerializer<StatisticsOrRecord<InputT>> 
createSerializer(ExecutionConfig config) {
+        TypeSerializer<InputT> recordSerializer = 
rowTypeInformation.createSerializer(config);
+        return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer, 
recordSerializer);
+    }
+
+    @Override
+    public String toString() {
+        return "StatisticsOrRecord";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        } else if (o != null && this.getClass() == o.getClass()) {
+            StatisticsOrRecordTypeInformation<InputT> that =
+                    (StatisticsOrRecordTypeInformation<InputT>) o;
+            return that.rowTypeInformation.equals(rowTypeInformation)
+                    && 
that.globalStatisticsSerializer.equals(globalStatisticsSerializer);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rowTypeInformation, globalStatisticsSerializer);
+    }
+
+    @Override
+    public boolean canEqual(Object obj) {

Review Comment:
   Method StatisticsOrRecordTypeInformation.canEqual(..) could be confused with 
overloaded method [TypeInformation.canEqual](1), since dispatch depends on 
static types.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordSerializer.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * The serializer for {@link StatisticsOrRecord}.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecordSerializer<InputT>
+        extends TypeSerializer<StatisticsOrRecord<InputT>> {
+    private final TypeSerializer<DataStatistics> statisticsSerializer;
+    private final TypeSerializer<InputT> recordSerializer;
+
+    StatisticsOrRecordSerializer(
+            TypeSerializer<DataStatistics> statisticsSerializer,
+            TypeSerializer<InputT> recordSerializer) {
+        this.statisticsSerializer = statisticsSerializer;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public boolean isImmutableType() {
+        return false;
+    }
+
+    @SuppressWarnings("ReferenceEquality")
+    @Override
+    public TypeSerializer<StatisticsOrRecord<InputT>> duplicate() {
+        TypeSerializer<DataStatistics> duplicateStatisticsSerializer =
+                statisticsSerializer.duplicate();
+        TypeSerializer<InputT> duplicateRowDataSerializer = 
recordSerializer.duplicate();
+        if ((statisticsSerializer != duplicateStatisticsSerializer)
+                || (recordSerializer != duplicateRowDataSerializer)) {
+            return new StatisticsOrRecordSerializer<>(
+                    duplicateStatisticsSerializer, duplicateRowDataSerializer);
+        } else {
+            return this;
+        }
+    }
+
+    @Override
+    public StatisticsOrRecord<InputT> createInstance() {
+        // arbitrarily always create RowData value instance
+        return 
StatisticsOrRecord.fromRecord(recordSerializer.createInstance());
+    }
+
+    @Override
+    public StatisticsOrRecord<InputT> copy(StatisticsOrRecord<InputT> from) {
+        if (from.hasRecord()) {
+            return 
StatisticsOrRecord.fromRecord(recordSerializer.copy(from.record()));
+        } else {
+            return 
StatisticsOrRecord.fromStatistics(statisticsSerializer.copy(from.statistics()));
+        }
+    }
+
+    @Override
+    public StatisticsOrRecord<InputT> copy(
+            StatisticsOrRecord<InputT> from, StatisticsOrRecord<InputT> reuse) 
{
+        StatisticsOrRecord<InputT> to;
+        if (from.hasRecord()) {
+            to = StatisticsOrRecord.reuseRecord(reuse, recordSerializer);
+            InputT record = recordSerializer.copy(from.record(), to.record());
+            to.record(record);
+        } else {
+            to = StatisticsOrRecord.reuseStatistics(reuse, 
statisticsSerializer);
+            DataStatistics statistics =
+                    statisticsSerializer.copy(from.statistics(), 
to.statistics());
+            to.statistics(statistics);
+        }
+
+        return to;
+    }
+
+    @Override
+    public int getLength() {
+        return -1;
+    }
+
+    @Override
+    public void serialize(StatisticsOrRecord<InputT> statisticsOrRecord, 
DataOutputView target)
+            throws IOException {
+        if (statisticsOrRecord.hasRecord()) {
+            target.writeBoolean(true);
+            recordSerializer.serialize(statisticsOrRecord.record(), target);
+        } else {
+            target.writeBoolean(false);
+            statisticsSerializer.serialize(statisticsOrRecord.statistics(), 
target);
+        }
+    }
+
+    @Override
+    public StatisticsOrRecord<InputT> deserialize(DataInputView source) throws 
IOException {
+        boolean isRecord = source.readBoolean();
+        if (isRecord) {
+            return 
StatisticsOrRecord.fromRecord(recordSerializer.deserialize(source));
+        } else {
+            return 
StatisticsOrRecord.fromStatistics(statisticsSerializer.deserialize(source));
+        }
+    }
+
+    @Override
+    public StatisticsOrRecord<InputT> deserialize(
+            StatisticsOrRecord<InputT> reuse, DataInputView source) throws 
IOException {
+        StatisticsOrRecord<InputT> to;
+        boolean isRecord = source.readBoolean();
+        if (isRecord) {
+            to = StatisticsOrRecord.reuseRecord(reuse, recordSerializer);
+            InputT record = recordSerializer.deserialize(to.record(), source);
+            to.record(record);
+        } else {
+            to = StatisticsOrRecord.reuseStatistics(reuse, 
statisticsSerializer);
+            DataStatistics statistics = 
statisticsSerializer.deserialize(to.statistics(), source);
+            to.statistics(statistics);
+        }
+
+        return to;
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+        boolean hasRecord = source.readBoolean();
+        target.writeBoolean(hasRecord);
+        if (hasRecord) {
+            recordSerializer.copy(source, target);
+        } else {
+            statisticsSerializer.copy(source, target);
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {

Review Comment:
   Method StatisticsOrRecordSerializer.equals(..) could be confused with 
overloaded method [TypeSerializer.equals](1), since dispatch depends on static 
types.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsSerializer.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Map;
+
+/** Serializer for {@link DataStatistics}. */
+@Internal
+public class DataStatisticsSerializer extends TypeSerializer<DataStatistics> {

Review Comment:
   Class DataStatisticsSerializer overrides [hashCode](1) but not equals.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -133,20 +144,76 @@ public AppendSinkWriter<InputT> 
createWriter(MailboxExecutor mailboxExecutor) {
 
         @Override
         public DataStream<InputT> addPreWriteTopology(DataStream<InputT> 
input) {
-            // For append only sink, we will do bucket shuffle only if bucket 
keys are not empty.
-            if (!bucketKeys.isEmpty() && shuffleByBucketId) {
-                return partition(
-                        input,
-                        new FlinkRowDataChannelComputer<>(
-                                toFlussRowType(tableRowType),
-                                bucketKeys,
-                                partitionKeys,
-                                lakeFormat,
-                                numBucket,
-                                flussSerializationSchema),
-                        input.getParallelism());
-            } else {
-                return input;
+            switch (shuffleMode) {
+                case BUCKET_SHUFFLE:
+                    if (!bucketKeys.isEmpty()) {
+                        return partition(
+                                input,
+                                new FlinkRowDataChannelComputer<>(
+                                        toFlussRowType(tableRowType),
+                                        bucketKeys,
+                                        partitionKeys,
+                                        lakeFormat,
+                                        numBucket,
+                                        flussSerializationSchema),
+                                input.getParallelism());
+                    }
+                    return input;
+                case NONE:
+                    return input;
+                case DYNAMIC_SHUFFLE:
+                    if (partitionKeys.isEmpty()) {
+                        throw new UnsupportedOperationException(
+                                "DYNAMIC_SHUFFLE is only supported for 
partition tables");
+                    }
+
+                    if (rowTypeInformation == null) {
+                        throw new UnsupportedOperationException(
+                                "RowTypeInformation is required for 
DYNAMIC_SHUFFLE mode.");
+                    }
+                    TypeInformation<StatisticsOrRecord<InputT>> 
statisticsOrRecordTypeInformation =
+                            new 
StatisticsOrRecordTypeInformation<>(rowTypeInformation);
+                    SingleOutputStreamOperator<StatisticsOrRecord<InputT>> 
shuffleStream =
+                            input.transform(
+                                            "Range shuffle Collector",
+                                            statisticsOrRecordTypeInformation,
+                                            new 
DataStatisticsOperatorFactory<>(
+                                                    
toFlussRowType(tableRowType),
+                                                    partitionKeys,
+                                                    flussSerializationSchema))
+                                    .uid("Range shuffle Collector" + tablePath)

Review Comment:
   The naming "Range shuffle Collector" is inconsistent with "DYNAMIC_SHUFFLE" 
terminology used throughout the codebase. The operator name should use "Dynamic 
shuffle" or "Data statistics" to match the feature name and maintain 
consistency.
   ```suggestion
                                               "Dynamic shuffle data 
statistics",
                                               
statisticsOrRecordTypeInformation,
                                               new 
DataStatisticsOperatorFactory<>(
                                                       
toFlussRowType(tableRowType),
                                                       partitionKeys,
                                                       
flussSerializationSchema))
                                       .uid("Dynamic shuffle data statistics" + 
tablePath)
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecord.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Either a record or a statistics.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecord<InputT> {

Review Comment:
   Class StatisticsOrRecord overrides [hashCode](1) but not equals.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java:
##########
@@ -159,6 +168,66 @@ public RowWithOp serialize(RowData value) throws Exception 
{
         return new RowWithOp(row, opType);
     }
 
+    @Override
+    public long size(RowData value, RowType rowType) {
+        if (value instanceof BinaryFormat) {
+            return ((BinaryFormat) value).getSizeInBytes();
+        }
+
+        long size = 0;
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            DataField field = rowType.getFields().get(i);
+            DataTypeRoot typeRoot = field.getType().getTypeRoot();
+            if (value.isNullAt(i)) {
+                continue;
+            }
+            switch (typeRoot) {
+                case CHAR:
+                    size += ((CharType) (field.getType())).getLength();
+                    break;
+                case STRING:
+                    StringData stringData = value.getString(i);
+                    if (stringData instanceof BinaryStringData) {
+                        size += ((BinaryStringData) 
stringData).getSizeInBytes();
+                    } else {
+                        size += converter.getString(i).getSizeInBytes();
+                    }
+                    break;
+                case BINARY:
+                    size += ((BinaryType) (field.getType())).getLength();
+                    break;
+                case BYTES:
+                    size += converter.getBytes(i).length;
+                    break;
+                case DECIMAL:
+                    size += ((DecimalType) (field.getType())).getPrecision();

Review Comment:
   The DECIMAL type size calculation is potentially incorrect. Line 203 uses 
precision as the size, but the actual size should be calculated based on the 
storage format (which depends on both precision and scale). For example, a 
DECIMAL(5,2) and DECIMAL(38,18) should have different sizes, but using 
precision alone doesn't accurately reflect the actual storage size.
   ```suggestion
                       DecimalType decimalType = (DecimalType) field.getType();
                       int precision = decimalType.getPrecision();
                       int scale = decimalType.getScale();
                       int integerDigits = Math.max(1, precision - scale);
                       int fractionalDigits = Math.max(0, scale);
                       int decimalPoint = fractionalDigits > 0 ? 1 : 0;
                       int sign = 1; // allow space for a potential '-' sign
                       size += integerDigits + fractionalDigits + decimalPoint 
+ sign;
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+/**
+ * The type information for StatisticsOrRecord.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecordTypeInformation<InputT>

Review Comment:
   Class StatisticsOrRecordTypeInformation overrides [hashCode](1) but not 
equals.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsEvent.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/** Event to send statistics to the coordinator. */
+@Internal
+class StatisticsEvent implements OperatorEvent {

Review Comment:
   Class StatisticsEvent overrides [hashCode](1) but not equals.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordSerializer.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * The serializer for {@link StatisticsOrRecord}.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecordSerializer<InputT>

Review Comment:
   Class StatisticsOrRecordSerializer overrides [hashCode](1) but not equals.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java:
##########
@@ -126,7 +145,9 @@ public FlussSink<InputT> build() {
         FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter<InputT>, InputT> 
writerBuilder;
 
         TablePath tablePath = new TablePath(database, tableName);
-        flussConfig.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), 
bootstrapServers);
+        if (bootstrapServers != null) {
+            flussConfig.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), 
bootstrapServers);
+        }

Review Comment:
   The validation logic allows bootstrapServers to be null (line 148 checks if 
not null before setting), but line 209 in validateConfiguration() requires it 
to be non-null. This creates an inconsistency where the build() method may 
proceed with a null bootstrapServers value, only to fail during validation. The 
conditional check at line 148 should either be removed or the validation should 
be updated to match this conditional behavior.
   ```suggestion
           flussConfig.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), 
bootstrapServers);
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsSerializer.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Map;
+
+/** Serializer for {@link DataStatistics}. */
+@Internal
+public class DataStatisticsSerializer extends TypeSerializer<DataStatistics> {
+    private final MapSerializer<String, Long> mapSerializer;
+
+    public DataStatisticsSerializer() {
+        this.mapSerializer =
+                new MapSerializer<>(StringSerializer.INSTANCE, 
LongSerializer.INSTANCE);
+    }
+
+    @Override
+    public boolean isImmutableType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<DataStatistics> duplicate() {
+        return new DataStatisticsSerializer();
+    }
+
+    @Override
+    public DataStatistics createInstance() {
+        return new DataStatistics();
+    }
+
+    @Override
+    public DataStatistics copy(DataStatistics from) {
+        return new DataStatistics(from.result());
+    }
+
+    @Override
+    public DataStatistics copy(DataStatistics from, DataStatistics reuse) {
+        // no benefit of reuse
+        return copy(from);
+    }
+
+    @Override
+    public int getLength() {
+        return -1;
+    }
+
+    @Override
+    public void serialize(DataStatistics record, DataOutputView target) throws 
IOException {
+        mapSerializer.serialize(record.result(), target);
+    }
+
+    @Override
+    public DataStatistics deserialize(DataInputView source) throws IOException 
{
+        Map<String, Long> partitionFrequency = 
mapSerializer.deserialize(source);
+        return new DataStatistics(partitionFrequency);
+    }
+
+    @Override
+    public DataStatistics deserialize(DataStatistics reuse, DataInputView 
source)
+            throws IOException {
+        // not much benefit to reuse
+        return deserialize(source);
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+        serialize(deserialize(source), target);
+    }
+
+    @Override
+    public boolean equals(Object obj) {

Review Comment:
   Method DataStatisticsSerializer.equals(..) could be confused with overloaded 
method [TypeSerializer.equals](1), since dispatch depends on static types.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+/**
+ * The type information for StatisticsOrRecord.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecordTypeInformation<InputT>
+        extends TypeInformation<StatisticsOrRecord<InputT>> {
+
+    private final TypeInformation<InputT> rowTypeInformation;
+    private final DataStatisticsSerializer globalStatisticsSerializer;
+
+    public StatisticsOrRecordTypeInformation(TypeInformation<InputT> 
rowTypeInformation) {
+        this.rowTypeInformation = rowTypeInformation;
+        this.globalStatisticsSerializer = new DataStatisticsSerializer();
+    }
+
+    @Override
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public boolean isTupleType() {
+        return false;
+    }
+
+    @Override
+    public int getArity() {
+        return 1;
+    }
+
+    @Override
+    public int getTotalFields() {
+        return 1;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Class<StatisticsOrRecord<InputT>> getTypeClass() {
+        return (Class<StatisticsOrRecord<InputT>>) (Class<?>) 
StatisticsOrRecord.class;
+    }
+
+    @Override
+    public boolean isKeyType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<StatisticsOrRecord<InputT>> 
createSerializer(SerializerConfig config) {
+        TypeSerializer<InputT> recordSerializer = 
rowTypeInformation.createSerializer(config);
+        return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer, 
recordSerializer);
+    }
+
+    @Override
+    @Deprecated
+    public TypeSerializer<StatisticsOrRecord<InputT>> 
createSerializer(ExecutionConfig config) {
+        TypeSerializer<InputT> recordSerializer = 
rowTypeInformation.createSerializer(config);
+        return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer, 
recordSerializer);
+    }
+
+    @Override
+    public String toString() {
+        return "StatisticsOrRecord";
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   Method StatisticsOrRecordTypeInformation.equals(..) could be confused with 
overloaded method [TypeInformation.equals](1), since dispatch depends on static 
types.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.bucketing.BucketingFunction;
+import org.apache.fluss.client.table.getter.PartitionGetter;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.flink.row.RowWithOp;
+import org.apache.fluss.flink.sink.ChannelComputer;
+import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
+import org.apache.fluss.flink.sink.serializer.SerializerInitContextImpl;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.types.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * {@link ChannelComputer} for {@link StatisticsOrRecord} which will change 
shuffle based on
+ * partition statistic.
+ */
+@Internal
+public class StatisticsOrRecordChannelComputer<InputT>
+        implements ChannelComputer<StatisticsOrRecord<InputT>> {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(StatisticsOrRecordChannelComputer.class);
+
+    private final @Nullable DataLakeFormat lakeFormat;
+    private final RowType flussRowType;
+    private final List<String> bucketKeys;
+    private final List<String> partitionKeys;
+    private final FlussSerializationSchema<InputT> serializationSchema;
+    private final int bucketNum;
+
+    private transient int downstreamNumChannels;
+    private transient KeyEncoder bucketKeyEncoder;
+    private transient PartitionGetter partitionGetter;
+    private transient MapPartitioner delegatePartitioner;
+    private transient AtomicLong roundRobinCounter;
+    private transient BucketingFunction bucketingFunction;
+    private transient Random random;
+
+    public StatisticsOrRecordChannelComputer(
+            RowType flussRowType,
+            List<String> bucketKeys,
+            List<String> partitionKeys,
+            int bucketNum,
+            @Nullable DataLakeFormat lakeFormat,
+            FlussSerializationSchema<InputT> serializationSchema) {
+        checkArgument(
+                partitionKeys != null && !partitionKeys.isEmpty(),
+                "Partition keys cannot be empty.");
+        this.flussRowType = flussRowType;
+        this.bucketKeys = bucketKeys;
+        this.partitionKeys = partitionKeys;
+        this.bucketNum = bucketNum;
+        this.lakeFormat = lakeFormat;
+        this.serializationSchema = serializationSchema;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        LOG.info("Setting up with {} downstream channels", numChannels);
+        this.downstreamNumChannels = numChannels;
+        this.bucketingFunction = BucketingFunction.of(lakeFormat);
+        this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys, 
lakeFormat);
+        this.partitionGetter = new PartitionGetter(flussRowType, 
partitionKeys);
+        try {
+            this.serializationSchema.open(new 
SerializerInitContextImpl(flussRowType));
+        } catch (Exception e) {
+            throw new FlussRuntimeException(e);
+        }
+        this.random = ThreadLocalRandom.current();
+    }
+
+    @Override
+    public int channel(StatisticsOrRecord<InputT> wrapper) {

Review Comment:
   Variable [wrapper](1) may be null at this access as suggested by [this](2) 
null guard.
   ```suggestion
       public int channel(StatisticsOrRecord<InputT> wrapper) {
           if (wrapper == null) {
               throw new FlussRuntimeException("StatisticsOrRecord wrapper must 
not be null");
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to