This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 029bc13ec55f7c4967148097c78fd3bdf5f23438
Author: Jark Wu <[email protected]>
AuthorDate: Sat Jan 17 17:43:33 2026 +0800

    [flink] Optimize sink statistics calculation to run only when necessary
    
    Refactor RowDataSerializationSchema to extract size estimation logic into
    RowDataSizeEstimator class and only enable it when PARTITION_DYNAMIC mode
    is used. This avoids unnecessary computation overhead for other modes.
    
    Other improvements:
    - Improve operator names: "Collect Statistics", "Strip Statistics"
    - Remove slotSharingGroup for "Strip Statistics" operator
    - Remove redundant test setup code in Flink 1.18/1.19 IT cases
    - Add comprehensive Javadoc for DistributionMode enum
    - Add sink name with table path for better observability
---
 .../fluss/flink/sink/Flink118TableSinkITCase.java  |  22 +--
 .../fluss/flink/sink/Flink119TableSinkITCase.java  |  23 +--
 .../java/org/apache/fluss/flink/row/RowWithOp.java |   8 +
 .../flink/sink/FlinkRowDataChannelComputer.java    |   4 +-
 .../org/apache/fluss/flink/sink/FlinkSink.java     |  34 ++--
 .../apache/fluss/flink/sink/FlinkTableSink.java    |   4 +-
 .../org/apache/fluss/flink/sink/FlussSink.java     |   7 +-
 .../apache/fluss/flink/sink/FlussSinkBuilder.java  |   9 +-
 .../sink/serializer/FlussSerializationSchema.java  |   3 +
 .../serializer/RowDataSerializationSchema.java     | 188 ++++++++++++---------
 .../sink/serializer/SerializerInitContextImpl.java |  15 +-
 .../sink/shuffle/DataStatisticsCoordinator.java    |  20 ++-
 .../flink/sink/shuffle/DataStatisticsOperator.java |   3 +-
 .../fluss/flink/sink/shuffle/DistributionMode.java |  43 +++++
 .../shuffle/StatisticsOrRecordChannelComputer.java |   2 +-
 .../fluss/flink/sink/writer/FlinkSinkWriter.java   |   3 +-
 .../fluss/flink/metrics/FlinkMetricsITCase.java    |   2 -
 .../sink/FlinkRowDataChannelComputerTest.java      |   2 +-
 .../fluss/flink/sink/FlinkTableSinkITCase.java     |  34 ++--
 .../apache/fluss/flink/sink/FlussSinkITCase.java   |   4 +-
 .../StatisticsOrRecordChannelComputerTest.java     |   2 +-
 website/docs/engine-flink/options.md               |   8 +-
 22 files changed, 257 insertions(+), 183 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java
 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java
index c1bf118d1..35caf71f2 100644
--- 
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java
@@ -17,25 +17,5 @@
 
 package org.apache.fluss.flink.sink;
 
-import org.apache.flink.test.util.AbstractTestBase;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-
 /** IT case for {@link FlinkTableSink} in Flink 1.18. */
-public class Flink118TableSinkITCase extends FlinkTableSinkITCase {
-    @BeforeEach
-    @Override
-    void before() throws Exception {
-        // invoke here because the AbstractTestBase in 1.18 is junit 4.
-        AbstractTestBase.MINI_CLUSTER_RESOURCE.before();
-        super.before();
-    }
-
-    @AfterEach
-    @Override
-    void after() throws Exception {
-        super.after();
-        // invoke here because the AbstractTestBase in 1.18 is junit 4.
-        AbstractTestBase.MINI_CLUSTER_RESOURCE.after();
-    }
-}
+public class Flink118TableSinkITCase extends FlinkTableSinkITCase {}
diff --git 
a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119TableSinkITCase.java
 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119TableSinkITCase.java
index cb5982a7b..d807bd9f1 100644
--- 
a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119TableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119TableSinkITCase.java
@@ -17,26 +17,5 @@
 
 package org.apache.fluss.flink.sink;
 
-import org.apache.flink.test.util.AbstractTestBase;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-
 /** IT case for {@link FlinkTableSink} in Flink 1.19. */
-public class Flink119TableSinkITCase extends FlinkTableSinkITCase {
-
-    @BeforeEach
-    @Override
-    void before() throws Exception {
-        // invoke here because the AbstractTestBase in 1.19 is junit 4.
-        AbstractTestBase.MINI_CLUSTER_RESOURCE.before();
-        super.before();
-    }
-
-    @AfterEach
-    @Override
-    void after() throws Exception {
-        super.after();
-        // invoke here because the AbstractTestBase in 1.19 is junit 4.
-        AbstractTestBase.MINI_CLUSTER_RESOURCE.after();
-    }
-}
+public class Flink119TableSinkITCase extends FlinkTableSinkITCase {}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/RowWithOp.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/RowWithOp.java
index 5ad0a748e..88b567869 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/RowWithOp.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/RowWithOp.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.flink.row;
 
+import org.apache.fluss.flink.sink.shuffle.DistributionMode;
 import org.apache.fluss.row.InternalRow;
 
 import javax.annotation.Nullable;
@@ -60,6 +61,9 @@ public class RowWithOp {
      *
      * @param row the internal row data (must not be null)
      * @param opType the operation type (must not be null)
+     * @param estimatedSizeInBytes the estimated size in bytes of the row, 
this is used to collect
+     *     statistics for partitions and dynamically adjust the shuffle routes 
for better
+     *     performance when {@link DistributionMode#PARTITION_DYNAMIC} is 
enabled.
      * @throws NullPointerException if {@code row} or {@code opType} is null
      */
     public RowWithOp(InternalRow row, OperationType opType, @Nullable Long 
estimatedSizeInBytes) {
@@ -86,6 +90,10 @@ public class RowWithOp {
         return opType;
     }
 
+    /**
+     * Returns the estimated size in bytes of the row. Can be null if it is 
not calculated when the
+     * {@link RowWithOp} was created.
+     */
     public @Nullable Long getEstimatedSizeInBytes() {
         return estimatedSizeInBytes;
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputer.java
index 1b78292dd..2ab28db1a 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputer.java
@@ -95,7 +95,7 @@ public class FlinkRowDataChannelComputer<InputT> implements 
ChannelComputer<Inpu
         try {
             // no need to read real database, thus assume to deserialize the 
fluss row as same as
             // flink table type.
-            this.serializationSchema.open(new 
SerializerInitContextImpl(flussRowType));
+            this.serializationSchema.open(new 
SerializerInitContextImpl(flussRowType, false));
         } catch (Exception e) {
             throw new FlussRuntimeException(e);
         }
@@ -126,7 +126,7 @@ public class FlinkRowDataChannelComputer<InputT> implements 
ChannelComputer<Inpu
 
     @Override
     public String toString() {
-        return "BUCKET_SHUFFLE";
+        return "BUCKET";
     }
 
     @VisibleForTesting
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
index fc6028d43..439a21678 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java
@@ -57,9 +57,11 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> {
     private static final long serialVersionUID = 1L;
 
     private final SinkWriterBuilder<? extends FlinkSinkWriter, InputT> builder;
+    private final TablePath tablePath;
 
-    FlinkSink(SinkWriterBuilder<? extends FlinkSinkWriter, InputT> builder) {
+    FlinkSink(SinkWriterBuilder<? extends FlinkSinkWriter, InputT> builder, 
TablePath tablePath) {
         this.builder = builder;
+        this.tablePath = tablePath;
     }
 
     @Override
@@ -70,8 +72,11 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> {
         return flinkSinkWriter;
     }
 
-    public DataStreamSink<InputT> addPreWriteTopology(DataStream<InputT> 
input) {
-        return builder.addPreWriteTopology(input).sinkTo(this);
+    public DataStreamSink<InputT> apply(DataStream<InputT> input) {
+        return builder.addPreWriteTopology(input)
+                .sinkTo(this)
+                .name("Sink(" + tablePath + ")")
+                .setParallelism(input.getParallelism());
     }
 
     @Internal
@@ -94,7 +99,7 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> {
         private final List<String> bucketKeys;
         private final List<String> partitionKeys;
         private final @Nullable DataLakeFormat lakeFormat;
-        private final DistributionMode shuffleMode;
+        private final DistributionMode distributionMode;
         private final FlussSerializationSchema<InputT> 
flussSerializationSchema;
 
         public AppendSinkWriterBuilder(
@@ -105,7 +110,7 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> {
                 List<String> bucketKeys,
                 List<String> partitionKeys,
                 @Nullable DataLakeFormat lakeFormat,
-                DistributionMode shuffleMode,
+                DistributionMode distributionMode,
                 FlussSerializationSchema<InputT> flussSerializationSchema) {
             this.tablePath = tablePath;
             this.flussConfig = flussConfig;
@@ -114,7 +119,7 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> {
             this.bucketKeys = bucketKeys;
             this.partitionKeys = partitionKeys;
             this.lakeFormat = lakeFormat;
-            this.shuffleMode = shuffleMode;
+            this.distributionMode = distributionMode;
             this.flussSerializationSchema = flussSerializationSchema;
         }
 
@@ -130,7 +135,7 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> {
 
         @Override
         public DataStream<InputT> addPreWriteTopology(DataStream<InputT> 
input) {
-            switch (shuffleMode) {
+            switch (distributionMode) {
                 case NONE:
                     return input;
                 case AUTO:
@@ -145,14 +150,14 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> {
                 case PARTITION_DYNAMIC:
                     if (partitionKeys.isEmpty()) {
                         throw new UnsupportedOperationException(
-                                "PARTITION_DYNAMIC is only supported for 
partition tables");
+                                "PARTITION_DYNAMIC is only supported for 
partitioned tables");
                     }
 
                     TypeInformation<StatisticsOrRecord<InputT>> 
statisticsOrRecordTypeInformation =
                             new 
StatisticsOrRecordTypeInformation<>(input.getType());
                     SingleOutputStreamOperator<StatisticsOrRecord<InputT>> 
shuffleStream =
                             input.transform(
-                                            "Dynamic shuffle data statistics",
+                                            "Collect Statistics",
                                             statisticsOrRecordTypeInformation,
                                             new 
DataStatisticsOperatorFactory<>(
                                                     
toFlussRowType(tableRowType),
@@ -179,16 +184,15 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> {
                                                     
out.collect(statisticsOrRecord.record());
                                                 }
                                             })
-                            // To promote operator chaining with the 
downstream writer operator,
-                            // setting slot sharing group and the parallelism 
as default, {@link
-                            // SinkTransformationTranslator} will set the 
parallelism same as sink
-                            // transformation.
-                            .slotSharingGroup("shuffle-partition-custom-group")
+                            .name("Strip Statistics")
+                            .setParallelism(input.getParallelism())
+                            // we remove the slot sharing group here make all 
operators can be
+                            // co-located in the same TaskManager slot
                             .returns(input.getType());
 
                 default:
                     throw new UnsupportedOperationException(
-                            "Unsupported distribution mode: " + shuffleMode);
+                            "Unsupported distribution mode: " + 
distributionMode);
             }
         }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
index f17fe1dea..747fe9341 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java
@@ -198,7 +198,7 @@ public class FlinkTableSink
             @Override
             public DataStreamSink<?> consumeDataStream(
                     ProviderContext providerContext, DataStream<RowData> 
dataStream) {
-                return flinkSink.addPreWriteTopology(dataStream);
+                return flinkSink.apply(dataStream);
             }
         };
     }
@@ -228,7 +228,7 @@ public class FlinkTableSink
                                 distributionMode,
                                 new RowDataSerializationSchema(true, 
sinkIgnoreDelete));
 
-        return new FlinkSink<>(flinkSinkWriterBuilder);
+        return new FlinkSink<>(flinkSinkWriterBuilder, tablePath);
     }
 
     private List<String> columns(int[] columnIndexes) {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java
index ad4297381..aae36b630 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java
@@ -19,6 +19,7 @@ package org.apache.fluss.flink.sink;
 
 import org.apache.fluss.annotation.PublicEvolving;
 import org.apache.fluss.flink.sink.writer.FlinkSinkWriter;
+import org.apache.fluss.metadata.TablePath;
 
 /**
  * FlussSink is a specialized Flink sink for writing data to Fluss.
@@ -39,8 +40,10 @@ public class FlussSink<InputT> extends FlinkSink<InputT> {
      *
      * @param builder the builder used to create the sink writer
      */
-    FlussSink(SinkWriterBuilder<? extends FlinkSinkWriter<InputT>, InputT> 
builder) {
-        super(builder);
+    FlussSink(
+            SinkWriterBuilder<? extends FlinkSinkWriter<InputT>, InputT> 
builder,
+            TablePath tablePath) {
+        super(builder, tablePath);
     }
 
     /**
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
index 416c3b38a..ad6bb2109 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
@@ -107,6 +107,13 @@ public class FlussSinkBuilder<InputT> {
         return this;
     }
 
+    /**
+     * Set the distribution mode for the sink. The distribution mode controls 
how records are
+     * shuffled to the Fluss sink operator.
+     *
+     * @param distributionMode
+     * @return
+     */
     public FlussSinkBuilder<InputT> setDistributionMode(DistributionMode 
distributionMode) {
         this.distributionMode = distributionMode;
         return this;
@@ -217,7 +224,7 @@ public class FlussSinkBuilder<InputT> {
                             serializationSchema);
         }
 
-        return new FlussSink<>(writerBuilder);
+        return new FlussSink<>(writerBuilder, tablePath);
     }
 
     private void validateConfiguration() {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java
index 2096b1095..533b8d4e3 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java
@@ -75,5 +75,8 @@ public interface FlussSerializationSchema<T> extends 
Serializable {
          * @return The schema of the input Flink row.
          */
         org.apache.flink.table.types.logical.RowType getInputRowSchema();
+
+        /** Indicates whether the statistics collection is enabled. */
+        boolean isStatisticEnabled();
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
index af79d3ba3..ac80f938c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java
@@ -106,9 +106,11 @@ public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowD
      */
     @Nullable private transient ProjectedRow outputProjection;
 
-    private transient RowType rowType;
-    private transient long fixedSizeInBytes;
-    private transient List<Integer> variableSizeFields;
+    /**
+     * Estimator for calculating the size of RowData instances. Maybe null if 
there is no need to
+     * calculate size.
+     */
+    @Nullable private transient RowDataSizeEstimator sizeEstimator;
 
     /**
      * Constructs a new {@code RowSerializationSchema}.
@@ -143,10 +145,9 @@ public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowD
             }
             outputProjection = ProjectedRow.from(indexMapping);
         }
-        this.rowType = context.getRowSchema();
-        Tuple2<Long, List<Integer>> calculateFixedSize = 
calculateFixedSize(rowType);
-        this.fixedSizeInBytes = calculateFixedSize.f0;
-        this.variableSizeFields = calculateFixedSize.f1;
+        if (context.isStatisticEnabled()) {
+            this.sizeEstimator = new 
RowDataSizeEstimator(context.getRowSchema());
+        }
     }
 
     /**
@@ -174,81 +175,11 @@ public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowD
             row = outputProjection.replaceRow(row);
         }
         OperationType opType = toOperationType(value.getRowKind());
-        long estimatedSizeInBytes = calculateSize(value);
-        return new RowWithOp(row, opType, estimatedSizeInBytes);
-    }
-
-    private long calculateSize(RowData value) {
-        if (value instanceof BinaryFormat) {
-            return ((BinaryFormat) value).getSizeInBytes();
+        Long estimatedSizeInBytes = null;
+        if (sizeEstimator != null) {
+            estimatedSizeInBytes = sizeEstimator.estimateSize(value);
         }
-
-        long size = fixedSizeInBytes;
-        for (int i : variableSizeFields) {
-            DataField field = rowType.getFields().get(i);
-            DataTypeRoot typeRoot = field.getType().getTypeRoot();
-            if (value.getArity() <= i || value.isNullAt(i)) {
-                continue;
-            }
-            switch (typeRoot) {
-                case STRING:
-                    StringData stringData = value.getString(i);
-                    if (stringData instanceof BinaryStringData) {
-                        size += ((BinaryStringData) 
stringData).getSizeInBytes();
-                    } else {
-                        size += stringData.toBytes().length;
-                    }
-                    break;
-                case BYTES:
-                    size += value.getBinary(i).length;
-                    break;
-            }
-        }
-        return size;
-    }
-
-    private Tuple2<Long, List<Integer>> calculateFixedSize(RowType rowType) {
-        long size = 0;
-        List<Integer> variableSizeFields = new ArrayList<>();
-        for (int i = 0; i < rowType.getFieldCount(); i++) {
-            DataField field = rowType.getFields().get(i);
-            DataTypeRoot typeRoot = field.getType().getTypeRoot();
-            switch (typeRoot) {
-                case CHAR:
-                    size += ((CharType) (field.getType())).getLength();
-                    break;
-                case BINARY:
-                    size += ((BinaryType) (field.getType())).getLength();
-                    break;
-                case DECIMAL:
-                    size += ((DecimalType) (field.getType())).getPrecision();
-                    break;
-                case BOOLEAN:
-                case TINYINT:
-                    size += 1;
-                    break;
-                case SMALLINT:
-                    size += 2;
-                    break;
-                case INTEGER:
-                case FLOAT:
-                case DATE:
-                case TIME_WITHOUT_TIME_ZONE:
-                    size += 4;
-                    break;
-                case BIGINT:
-                case DOUBLE:
-                case TIMESTAMP_WITHOUT_TIME_ZONE:
-                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                    size += 8;
-                    break;
-                case STRING:
-                case BYTES:
-                    variableSizeFields.add(i);
-                    break;
-            }
-        }
-        return Tuple2.of(size, variableSizeFields);
+        return new RowWithOp(row, opType, estimatedSizeInBytes);
     }
 
     /**
@@ -281,4 +212,99 @@ public class RowDataSerializationSchema implements 
FlussSerializationSchema<RowD
             }
         }
     }
+
+    /** Estimator for the size of RowData instances based on their schema and 
row. */
+    private static class RowDataSizeEstimator {
+        private final RowType rowType;
+        private final long fixedSizeInBytes;
+        private final int[] variableSizeFields;
+
+        RowDataSizeEstimator(RowType rowType) {
+            this.rowType = rowType;
+            Tuple2<Long, int[]> result = 
calculateFixedSizeAndVariableColumnIndex(rowType);
+            this.fixedSizeInBytes = result.f0;
+            this.variableSizeFields = result.f1;
+        }
+
+        long estimateSize(RowData value) {
+            if (value instanceof BinaryFormat) {
+                return ((BinaryFormat) value).getSizeInBytes();
+            }
+
+            long size = fixedSizeInBytes;
+            for (int i : variableSizeFields) {
+                DataField field = rowType.getFields().get(i);
+                DataTypeRoot typeRoot = field.getType().getTypeRoot();
+                // handle schema evolution where field may not exist, and null 
values
+                if (value.getArity() <= i || value.isNullAt(i)) {
+                    continue;
+                }
+                switch (typeRoot) {
+                    case STRING:
+                        StringData stringData = value.getString(i);
+                        if (stringData instanceof BinaryStringData) {
+                            size += ((BinaryStringData) 
stringData).getSizeInBytes();
+                        } else {
+                            size += stringData.toBytes().length;
+                        }
+                        break;
+                    case BYTES:
+                        size += value.getBinary(i).length;
+                        break;
+                }
+            }
+            return size;
+        }
+
+        private static Tuple2<Long, int[]> 
calculateFixedSizeAndVariableColumnIndex(
+                RowType rowType) {
+            long fixedSizeInBytes = 0;
+            List<Integer> variableSizeFields = new ArrayList<>();
+            for (int i = 0; i < rowType.getFieldCount(); i++) {
+                DataField field = rowType.getFields().get(i);
+                DataTypeRoot typeRoot = field.getType().getTypeRoot();
+                switch (typeRoot) {
+                    case CHAR:
+                        fixedSizeInBytes += ((CharType) 
(field.getType())).getLength();
+                        break;
+                    case BINARY:
+                        fixedSizeInBytes += ((BinaryType) 
(field.getType())).getLength();
+                        break;
+                    case DECIMAL:
+                        fixedSizeInBytes += ((DecimalType) 
(field.getType())).getPrecision();
+                        break;
+                    case BOOLEAN:
+                    case TINYINT:
+                        fixedSizeInBytes += 1;
+                        break;
+                    case SMALLINT:
+                        fixedSizeInBytes += 2;
+                        break;
+                    case INTEGER:
+                    case FLOAT:
+                    case DATE:
+                    case TIME_WITHOUT_TIME_ZONE:
+                        fixedSizeInBytes += 4;
+                        break;
+                    case BIGINT:
+                    case DOUBLE:
+                    case TIMESTAMP_WITHOUT_TIME_ZONE:
+                    case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                        fixedSizeInBytes += 8;
+                        break;
+                    case STRING:
+                    case BYTES:
+                        variableSizeFields.add(i);
+                        break;
+                }
+            }
+
+            int[] variableColumnIndexes = new int[variableSizeFields.size()];
+            for (int i = 0; i < variableSizeFields.size(); i++) {
+                variableColumnIndexes[i] = variableSizeFields.get(i);
+            }
+
+            return Tuple2.of(fixedSizeInBytes, variableColumnIndexes);
+        }
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java
index cc1d168cd..1c00a9386 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java
@@ -30,15 +30,19 @@ public class SerializerInitContextImpl implements 
FlussSerializationSchema.Initi
 
     private final RowType flussRowSchema;
     private final org.apache.flink.table.types.logical.RowType flinkRowType;
+    private final boolean isStatisticEnabled;
 
-    public SerializerInitContextImpl(RowType rowSchema) {
-        this(rowSchema, toFlinkRowType(rowSchema));
+    public SerializerInitContextImpl(RowType rowSchema, boolean 
isStatisticEnabled) {
+        this(rowSchema, toFlinkRowType(rowSchema), isStatisticEnabled);
     }
 
     public SerializerInitContextImpl(
-            RowType rowSchema, org.apache.flink.table.types.logical.RowType 
flinkRowType) {
+            RowType rowSchema,
+            org.apache.flink.table.types.logical.RowType flinkRowType,
+            boolean isStatisticEnabled) {
         this.flussRowSchema = checkNotNull(rowSchema, "flussRowSchema");
         this.flinkRowType = checkNotNull(flinkRowType, "flinkRowType");
+        this.isStatisticEnabled = isStatisticEnabled;
     }
 
     @Override
@@ -50,4 +54,9 @@ public class SerializerInitContextImpl implements 
FlussSerializationSchema.Initi
     public org.apache.flink.table.types.logical.RowType getInputRowSchema() {
         return flinkRowType;
     }
+
+    @Override
+    public boolean isStatisticEnabled() {
+        return isStatisticEnabled;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java
index 218db829d..da122a7b2 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java
@@ -247,12 +247,20 @@ class DataStatisticsCoordinator implements 
OperatorCoordinator {
                 StatisticsEvent.createStatisticsEvent(
                         checkpointId, statistics, statisticsSerializer);
         for (int i = 0; i < context.currentParallelism(); ++i) {
-            try {
-                
subtaskGateways.getSubtaskGateway(i).sendEvent(statisticsEvent).get();
-            } catch (Exception exception) {
-                // Ignore future return value for potential error (e.g. 
subtask down).
-                LOG.warn("Failed to send global statistics to subtask {}", i, 
exception);
-            }
+            // Ignore future return value for potential error (e.g. subtask 
down).
+            // Upon restart, subtasks send request to coordinator to refresh 
statistics
+            // if there is any difference
+            final int subtaskIndex = i;
+            subtaskGateways
+                    .getSubtaskGateway(subtaskIndex)
+                    .sendEvent(statisticsEvent)
+                    .whenComplete(
+                            (ack, error) -> {
+                                LOG.warn(
+                                        "Failed to send global statistics to 
subtask {}",
+                                        subtaskIndex,
+                                        error);
+                            });
         }
     }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsOperator.java
index cd051fc14..ccf938809 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsOperator.java
@@ -95,7 +95,8 @@ public class DataStatisticsOperator<InputT>
         this.partitionGetter = new PartitionGetter(rowType, partitionKeys);
         this.statisticsSerializer = new DataStatisticsSerializer();
         try {
-            this.flussSerializationSchema.open(new 
SerializerInitContextImpl(rowType));
+            // enable statistics collection for the serialization schema
+            this.flussSerializationSchema.open(new 
SerializerInitContextImpl(rowType, true));
         } catch (Exception e) {
             throw new FlussRuntimeException(e);
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java
index c63c11b84..63883533b 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java
@@ -19,8 +19,51 @@ package org.apache.fluss.flink.sink.shuffle;
 
 /** Distribution mode for sink shuffling. */
 public enum DistributionMode {
+    /**
+     * Uses Flink's default shuffle strategy:
+     *
+     * <p>Typically uses FORWARD when the sink parallelism matches the 
upstream parallelism Uses
+     * ROUND_ROBIN when parallelisms differ.
+     */
     NONE,
+
+    /**
+     * Automatically chooses the best mode based on the table type:
+     *
+     * <p>Uses BUCKET mode for Primary Key Tables and Log Tables with bucket 
keys maximize
+     * throughput.
+     *
+     * <p>Uses NONE for Log Tables without bucket keys.
+     */
     AUTO,
+
+    /**
+     * Shuffle data by bucket ID before writing to sink. This groups data with 
the same bucket ID to
+     * be processed by the same task, which improves client processing 
efficiency and reduces
+     * resource consumption.
+     *
+     * <p>Characteristics:
+     *
+     * <p>Particularly recommended for Primary Key tables as it can 
significantly improve
+     * throughput. For Log Tables, bucket shuffle only takes effect when the 
'bucket.key' is
+     * defined.
+     *
+     * <p>Note: When sink parallelism exceeds the number of buckets, some sink 
tasks may remain idle
+     * without receiving data.
+     */
     BUCKET,
+
+    /**
+     * Dynamically adjusts shuffle strategy based on partition key traffic 
patterns. This mode
+     * monitors data distribution and adjusts the shuffle behavior to balance 
the load.
+     *
+     * <p>Characteristics:
+     *
+     * <p>Only supported for partitioned Log Tables (not supported for Primary 
Key tables now). Use
+     * this mode when data is highly skewed across partitions or when there 
are many partitions.
+     *
+     * <p>Note: This mode has overhead costs including data statistics 
collection and additional
+     * shuffle operations.
+     */
     PARTITION_DYNAMIC
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java
index 32b7a4094..692d5b28d 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java
@@ -99,7 +99,7 @@ public class StatisticsOrRecordChannelComputer<InputT>
         this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys, 
lakeFormat);
         this.partitionGetter = new PartitionGetter(flussRowType, 
partitionKeys);
         try {
-            this.serializationSchema.open(new 
SerializerInitContextImpl(flussRowType));
+            this.serializationSchema.open(new 
SerializerInitContextImpl(flussRowType, false));
         } catch (Exception e) {
             throw new FlussRuntimeException(e);
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
index fe0eb28ea..f942bc5ad 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java
@@ -117,7 +117,8 @@ public abstract class FlinkSinkWriter<InputT> implements 
SinkWriter<InputT> {
 
         try {
             this.serializationSchema.open(
-                    new 
SerializerInitContextImpl(table.getTableInfo().getRowType(), tableRowType));
+                    new SerializerInitContextImpl(
+                            table.getTableInfo().getRowType(), tableRowType, 
false));
         } catch (Exception e) {
             throw new FlussRuntimeException(e);
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
index 41bda51a8..596ad7d1d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java
@@ -42,7 +42,6 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
@@ -166,7 +165,6 @@ abstract class FlinkMetricsITCase {
         createTable(tablePath, tableDescriptor);
 
         // test write
-        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
         TableResult tableResult =
                 tEnv.executeSql("insert into test values (1, 'name1'), (2, 
'name2'), (3, 'name3')");
         JobClient client = tableResult.getJobClient().get();
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputerTest.java
index 84a01c020..93d5254fd 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputerTest.java
@@ -40,7 +40,7 @@ class FlinkRowDataChannelComputerTest {
 
     @BeforeAll
     static void init() throws Exception {
-        serializationSchema.open(new 
SerializerInitContextImpl(DATA1_ROW_TYPE));
+        serializationSchema.open(new SerializerInitContextImpl(DATA1_ROW_TYPE, 
false));
     }
 
     @Test
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 6f1c65ecd..1af582e4d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -102,9 +102,7 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         // open a catalog so that we can get table from the catalog
         String bootstrapServers = 
FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
         // create table environment
-        org.apache.flink.configuration.Configuration config =
-                new org.apache.flink.configuration.Configuration();
-        env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
 
         tEnv = StreamTableEnvironment.create(env);
@@ -240,15 +238,17 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         if (distributionMode == DistributionMode.PARTITION_DYNAMIC) {
             assertThatThrownBy(() -> tEnv.executeSql(insertSql))
                     .hasMessageContaining(
-                            "PARTITION_DYNAMIC is only supported for partition 
tables");
+                            "PARTITION_DYNAMIC is only supported for 
partitioned tables");
             return;
         }
         String insertPlan = tEnv.explainSql(insertSql, 
ExplainDetail.JSON_EXECUTION_PLAN);
         if (distributionMode == DistributionMode.BUCKET) {
-            assertThat(insertPlan).contains("\"ship_strategy\" : 
\"BUCKET_SHUFFLE\"");
+            assertThat(insertPlan).contains("\"ship_strategy\" : \"BUCKET\"");
         } else {
             assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\"");
         }
+        // there shouldn't have REBALANCE shuffle strategy, this asserts 
operator parallelism
+        assertThat(insertPlan).doesNotContain("\"ship_strategy\" : 
\"REBALANCE\"");
         tEnv.executeSql(insertSql).await();
 
         CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
sink_test").collect();
@@ -272,8 +272,8 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         List<String> actual = collectRowsWithTimeout(rowIter, 
expectedRows.size());
         assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRows);
 
+        // check data with the same bucket key should be read in sequence.
         if (distributionMode == DistributionMode.BUCKET) {
-            // check data with the same bucket key should be read in sequence.
             for (List<String> expected : expectedGroups) {
                 if (expected.size() <= 1) {
                     continue;
@@ -290,7 +290,6 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
 
     @Test
     void testAppendLogWithRoundRobin() throws Exception {
-        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
         tEnv.executeSql(
                 "create table sink_test (a int not null, b bigint, c string) 
with "
                         + "('bucket.num' = '3', 
'client.writer.bucket.no-key-assigner' = 'round_robin')");
@@ -387,18 +386,23 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
                         + "(10, 3510, 'coco'), "
                         + "(11, 3511, 'stave'), "
                         + "(12, 3512, 'Tim')";
-        if (distributionMode == DistributionMode.PARTITION_DYNAMIC) {
-            assertThat(tEnv.explainSql(insertSql, 
ExplainDetail.JSON_EXECUTION_PLAN))
-                    .contains(String.format("\"ship_strategy\" : \"%s\"", 
distributionMode.name()));
-        } else if (distributionMode == DistributionMode.BUCKET) {
+
+        if (distributionMode == DistributionMode.BUCKET) {
             assertThatThrownBy(() -> tEnv.explainSql(insertSql, 
ExplainDetail.JSON_EXECUTION_PLAN))
                     .hasMessageContaining(
                             "BUCKET mode is only supported for log tables with 
bucket keys");
             return;
+        }
+
+        String insertPlan = tEnv.explainSql(insertSql, 
ExplainDetail.JSON_EXECUTION_PLAN);
+        if (distributionMode == DistributionMode.PARTITION_DYNAMIC) {
+            assertThat(insertPlan)
+                    .contains(String.format("\"ship_strategy\" : \"%s\"", 
distributionMode.name()));
         } else {
-            assertThat(tEnv.explainSql(insertSql, 
ExplainDetail.JSON_EXECUTION_PLAN))
-                    .contains("\"ship_strategy\" : \"FORWARD\"");
+            assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\"");
         }
+        assertThat(insertPlan).doesNotContain("\"ship_strategy\" : 
\"REBALANCE\"");
+
         tEnv.executeSql(insertSql).await();
 
         CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
sink_test").collect();
@@ -449,11 +453,12 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
 
         String insertPlan = tEnv.explainSql(insertSql, 
ExplainDetail.JSON_EXECUTION_PLAN);
         if (distributionMode == DistributionMode.BUCKET) {
-            assertThat(insertPlan).contains("\"ship_strategy\" : 
\"BUCKET_SHUFFLE\"");
+            assertThat(insertPlan).contains("\"ship_strategy\" : \"BUCKET\"");
         } else if (distributionMode == DistributionMode.AUTO
                 || distributionMode == DistributionMode.NONE) {
             assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\"");
         }
+        assertThat(insertPlan).doesNotContain("\"ship_strategy\" : 
\"REBALANCE\"");
 
         tEnv.executeSql(insertSql).await();
 
@@ -625,7 +630,6 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
 
     @Test
     void testFirstRowMergeEngine() throws Exception {
-        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
         tEnv.executeSql(
                 "create table first_row_source (a int not null primary key not 
enforced,"
                         + " b string) with('table.merge-engine' = 
'first_row')");
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
index bebc233ce..b88aa4dae 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
@@ -144,7 +144,7 @@ public class FlussSinkITCase extends FlinkTestBase {
                         .setSerializationSchema(serializationSchema)
                         .build();
 
-        flussSink.addPreWriteTopology(stream).name("Fluss Sink");
+        flussSink.apply(stream).name("Fluss Sink");
 
         env.executeAsync("Test RowData Fluss Sink");
 
@@ -272,7 +272,7 @@ public class FlussSinkITCase extends FlinkTestBase {
                         .setSerializationSchema(new 
TestOrderSerializationSchema())
                         .build();
 
-        flussSink.addPreWriteTopology(stream).name("Fluss Sink");
+        flussSink.apply(stream).name("Fluss Sink");
         env.executeAsync("Test Order Fluss Sink");
 
         Table table = conn.getTable(new TablePath(DEFAULT_DB, pkTableName));
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputerTest.java
index b16f90244..c63690951 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputerTest.java
@@ -50,7 +50,7 @@ class StatisticsOrRecordChannelComputerTest {
 
     @BeforeAll
     static void init() throws Exception {
-        serializationSchema.open(new 
SerializerInitContextImpl(DATA1_ROW_TYPE));
+        serializationSchema.open(new SerializerInitContextImpl(DATA1_ROW_TYPE, 
false));
     }
 
     @Test
diff --git a/website/docs/engine-flink/options.md 
b/website/docs/engine-flink/options.md
index f0f4995fb..d5efb0aaf 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -130,8 +130,8 @@ See more details about [ALTER TABLE ... 
SET](engine-flink/ddl.md#set-properties)
 | Option                                              | Type       | Default   
        | Description                                                           
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 
|-----------------------------------------------------|------------|-------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | sink.ignore-delete                                  | Boolean    | false     
        | If set to true, the sink will ignore DELETE and UPDATE_BEFORE 
changelog events.                                                               
                                                                                
                                                                                
                                                                                
                      [...]
-| sink.bucket-shuffle                                 | Boolean    | true      
        | Whether to shuffle by bucket id before write to sink. Shuffling the 
data with the same bucket id to be processed by the same task can improve the 
efficiency of client processing and reduce resource consumption. For Log Table, 
bucket shuffle will only take effect when the 'bucket.key' is defined. For 
Primary Key table, it is enabled by default. This option is deprecated. Please 
use sink.distribution-m [...]
-| sink.distribution-mode                              | Enum       | AUTO      
        | Defines the distribution mode for writing data to the sink. See 
[Distribution Modes](#distribution-modes) for details. |
+| sink.bucket-shuffle                                 | Boolean    | true      
        | Whether to shuffle by bucket id before write to sink. Shuffling the 
data with the same bucket id to be processed by the same task can improve the 
efficiency of client processing and reduce resource consumption. For Log Table, 
bucket shuffle will only take effect when the 'bucket.key' is defined. For 
Primary Key table, it is enabled by default. This option is deprecated. Please 
use `sink.distribution- [...]
+| sink.distribution-mode                              | Enum       | AUTO      
        | Defines the distribution mode for shuffling data to the sink. 
Available options are `AUTO`, `NONE`, `BUCKET`, and `PARTITION_DYNAMIC`. See 
[Distribution Modes](#distribution-modes) for details about each option. |
 | client.writer.buffer.memory-size                    | MemorySize | 64mb      
        | The total bytes of memory the writer can use to buffer internal rows. 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | client.writer.buffer.page-size                      | MemorySize | 128kb     
        | Size of every page in memory buffers 
(`client.writer.buffer.memory-size`).                                           
                                                                                
                                                                                
                                                                                
                                               [...]
 | client.writer.buffer.per-request-memory-size        | MemorySize | 16mb      
        | The minimum number of bytes that will be allocated by the writer 
rounded down to the closest multiple of client.writer.buffer.page-size. It must 
be greater than or equal to client.writer.buffer.page-size. This option allows 
to allocate memory in batches to have better CPU-cached friendliness due to 
contiguous segments.                                                            
                        [...]
@@ -173,8 +173,8 @@ Shuffle data by bucket ID before writing to sink. This 
groups data with the same
 Dynamically adjusts shuffle strategy based on partition key traffic patterns. 
This mode monitors data distribution and adjusts the shuffle behavior to 
balance the load.
 
 **Characteristics:**
-- Only supported for partitioned Log Tables (not for Primary Key tables now)
-- Use this mode when data is highly skewed across partitions or when there 
are比较多 partitions
+- Only supported for partitioned Log Tables (not supported for Primary Key 
tables now)
+- Use this mode when data is highly skewed across partitions or when there are 
many partitions
 - **Note:** This mode has overhead costs including data statistics collection 
and additional shuffle operations
 
 ## Other Options

Reply via email to