This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 029bc13ec55f7c4967148097c78fd3bdf5f23438 Author: Jark Wu <[email protected]> AuthorDate: Sat Jan 17 17:43:33 2026 +0800 [flink] Optimize sink statistics calculation to run only when necessary Refactor RowDataSerializationSchema to extract size estimation logic into RowDataSizeEstimator class and only enable it when PARTITION_DYNAMIC mode is used. This avoids unnecessary computation overhead for other modes. Other improvements: - Improve operator names: "Collect Statistics", "Strip Statistics" - Remove slotSharingGroup for "Strip Statistics" operator - Remove redundant test setup code in Flink 1.18/1.19 IT cases - Add comprehensive Javadoc for DistributionMode enum - Add sink name with table path for better observability --- .../fluss/flink/sink/Flink118TableSinkITCase.java | 22 +-- .../fluss/flink/sink/Flink119TableSinkITCase.java | 23 +-- .../java/org/apache/fluss/flink/row/RowWithOp.java | 8 + .../flink/sink/FlinkRowDataChannelComputer.java | 4 +- .../org/apache/fluss/flink/sink/FlinkSink.java | 34 ++-- .../apache/fluss/flink/sink/FlinkTableSink.java | 4 +- .../org/apache/fluss/flink/sink/FlussSink.java | 7 +- .../apache/fluss/flink/sink/FlussSinkBuilder.java | 9 +- .../sink/serializer/FlussSerializationSchema.java | 3 + .../serializer/RowDataSerializationSchema.java | 188 ++++++++++++--------- .../sink/serializer/SerializerInitContextImpl.java | 15 +- .../sink/shuffle/DataStatisticsCoordinator.java | 20 ++- .../flink/sink/shuffle/DataStatisticsOperator.java | 3 +- .../fluss/flink/sink/shuffle/DistributionMode.java | 43 +++++ .../shuffle/StatisticsOrRecordChannelComputer.java | 2 +- .../fluss/flink/sink/writer/FlinkSinkWriter.java | 3 +- .../fluss/flink/metrics/FlinkMetricsITCase.java | 2 - .../sink/FlinkRowDataChannelComputerTest.java | 2 +- .../fluss/flink/sink/FlinkTableSinkITCase.java | 34 ++-- .../apache/fluss/flink/sink/FlussSinkITCase.java | 4 +- .../StatisticsOrRecordChannelComputerTest.java | 2 +- website/docs/engine-flink/options.md | 8 +- 22 files changed, 257 insertions(+), 183 deletions(-) diff --git a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java index c1bf118d1..35caf71f2 100644 --- a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java +++ b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java @@ -17,25 +17,5 @@ package org.apache.fluss.flink.sink; -import org.apache.flink.test.util.AbstractTestBase; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; - /** IT case for {@link FlinkTableSink} in Flink 1.18. */ -public class Flink118TableSinkITCase extends FlinkTableSinkITCase { - @BeforeEach - @Override - void before() throws Exception { - // invoke here because the AbstractTestBase in 1.18 is junit 4. - AbstractTestBase.MINI_CLUSTER_RESOURCE.before(); - super.before(); - } - - @AfterEach - @Override - void after() throws Exception { - super.after(); - // invoke here because the AbstractTestBase in 1.18 is junit 4. - AbstractTestBase.MINI_CLUSTER_RESOURCE.after(); - } -} +public class Flink118TableSinkITCase extends FlinkTableSinkITCase {} diff --git a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119TableSinkITCase.java b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119TableSinkITCase.java index cb5982a7b..d807bd9f1 100644 --- a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119TableSinkITCase.java +++ b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119TableSinkITCase.java @@ -17,26 +17,5 @@ package org.apache.fluss.flink.sink; -import org.apache.flink.test.util.AbstractTestBase; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; - /** IT case for {@link FlinkTableSink} in Flink 1.19. */ -public class Flink119TableSinkITCase extends FlinkTableSinkITCase { - - @BeforeEach - @Override - void before() throws Exception { - // invoke here because the AbstractTestBase in 1.19 is junit 4. - AbstractTestBase.MINI_CLUSTER_RESOURCE.before(); - super.before(); - } - - @AfterEach - @Override - void after() throws Exception { - super.after(); - // invoke here because the AbstractTestBase in 1.19 is junit 4. - AbstractTestBase.MINI_CLUSTER_RESOURCE.after(); - } -} +public class Flink119TableSinkITCase extends FlinkTableSinkITCase {} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/RowWithOp.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/RowWithOp.java index 5ad0a748e..88b567869 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/RowWithOp.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/RowWithOp.java @@ -17,6 +17,7 @@ package org.apache.fluss.flink.row; +import org.apache.fluss.flink.sink.shuffle.DistributionMode; import org.apache.fluss.row.InternalRow; import javax.annotation.Nullable; @@ -60,6 +61,9 @@ public class RowWithOp { * * @param row the internal row data (must not be null) * @param opType the operation type (must not be null) + * @param estimatedSizeInBytes the estimated size in bytes of the row, this is used to collect + * statistics for partitions and dynamically adjust the shuffle routes for better + * performance when {@link DistributionMode#PARTITION_DYNAMIC} is enabled. * @throws NullPointerException if {@code row} or {@code opType} is null */ public RowWithOp(InternalRow row, OperationType opType, @Nullable Long estimatedSizeInBytes) { @@ -86,6 +90,10 @@ public class RowWithOp { return opType; } + /** + * Returns the estimated size in bytes of the row. Can be null if it is not calculated when the + * {@link RowWithOp} was created. + */ public @Nullable Long getEstimatedSizeInBytes() { return estimatedSizeInBytes; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputer.java index 1b78292dd..2ab28db1a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputer.java @@ -95,7 +95,7 @@ public class FlinkRowDataChannelComputer<InputT> implements ChannelComputer<Inpu try { // no need to read real database, thus assume to deserialize the fluss row as same as // flink table type. - this.serializationSchema.open(new SerializerInitContextImpl(flussRowType)); + this.serializationSchema.open(new SerializerInitContextImpl(flussRowType, false)); } catch (Exception e) { throw new FlussRuntimeException(e); } @@ -126,7 +126,7 @@ public class FlinkRowDataChannelComputer<InputT> implements ChannelComputer<Inpu @Override public String toString() { - return "BUCKET_SHUFFLE"; + return "BUCKET"; } @VisibleForTesting diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java index fc6028d43..439a21678 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java @@ -57,9 +57,11 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> { private static final long serialVersionUID = 1L; private final SinkWriterBuilder<? extends FlinkSinkWriter, InputT> builder; + private final TablePath tablePath; - FlinkSink(SinkWriterBuilder<? extends FlinkSinkWriter, InputT> builder) { + FlinkSink(SinkWriterBuilder<? extends FlinkSinkWriter, InputT> builder, TablePath tablePath) { this.builder = builder; + this.tablePath = tablePath; } @Override @@ -70,8 +72,11 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> { return flinkSinkWriter; } - public DataStreamSink<InputT> addPreWriteTopology(DataStream<InputT> input) { - return builder.addPreWriteTopology(input).sinkTo(this); + public DataStreamSink<InputT> apply(DataStream<InputT> input) { + return builder.addPreWriteTopology(input) + .sinkTo(this) + .name("Sink(" + tablePath + ")") + .setParallelism(input.getParallelism()); } @Internal @@ -94,7 +99,7 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> { private final List<String> bucketKeys; private final List<String> partitionKeys; private final @Nullable DataLakeFormat lakeFormat; - private final DistributionMode shuffleMode; + private final DistributionMode distributionMode; private final FlussSerializationSchema<InputT> flussSerializationSchema; public AppendSinkWriterBuilder( @@ -105,7 +110,7 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> { List<String> bucketKeys, List<String> partitionKeys, @Nullable DataLakeFormat lakeFormat, - DistributionMode shuffleMode, + DistributionMode distributionMode, FlussSerializationSchema<InputT> flussSerializationSchema) { this.tablePath = tablePath; this.flussConfig = flussConfig; @@ -114,7 +119,7 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> { this.bucketKeys = bucketKeys; this.partitionKeys = partitionKeys; this.lakeFormat = lakeFormat; - this.shuffleMode = shuffleMode; + this.distributionMode = distributionMode; this.flussSerializationSchema = flussSerializationSchema; } @@ -130,7 +135,7 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> { @Override public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) { - switch (shuffleMode) { + switch (distributionMode) { case NONE: return input; case AUTO: @@ -145,14 +150,14 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> { case PARTITION_DYNAMIC: if (partitionKeys.isEmpty()) { throw new UnsupportedOperationException( - "PARTITION_DYNAMIC is only supported for partition tables"); + "PARTITION_DYNAMIC is only supported for partitioned tables"); } TypeInformation<StatisticsOrRecord<InputT>> statisticsOrRecordTypeInformation = new StatisticsOrRecordTypeInformation<>(input.getType()); SingleOutputStreamOperator<StatisticsOrRecord<InputT>> shuffleStream = input.transform( - "Dynamic shuffle data statistics", + "Collect Statistics", statisticsOrRecordTypeInformation, new DataStatisticsOperatorFactory<>( toFlussRowType(tableRowType), @@ -179,16 +184,15 @@ class FlinkSink<InputT> extends SinkAdapter<InputT> { out.collect(statisticsOrRecord.record()); } }) - // To promote operator chaining with the downstream writer operator, - // setting slot sharing group and the parallelism as default, {@link - // SinkTransformationTranslator} will set the parallelism same as sink - // transformation. - .slotSharingGroup("shuffle-partition-custom-group") + .name("Strip Statistics") + .setParallelism(input.getParallelism()) + // we remove the slot sharing group here make all operators can be + // co-located in the same TaskManager slot .returns(input.getType()); default: throw new UnsupportedOperationException( - "Unsupported distribution mode: " + shuffleMode); + "Unsupported distribution mode: " + distributionMode); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java index f17fe1dea..747fe9341 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java @@ -198,7 +198,7 @@ public class FlinkTableSink @Override public DataStreamSink<?> consumeDataStream( ProviderContext providerContext, DataStream<RowData> dataStream) { - return flinkSink.addPreWriteTopology(dataStream); + return flinkSink.apply(dataStream); } }; } @@ -228,7 +228,7 @@ public class FlinkTableSink distributionMode, new RowDataSerializationSchema(true, sinkIgnoreDelete)); - return new FlinkSink<>(flinkSinkWriterBuilder); + return new FlinkSink<>(flinkSinkWriterBuilder, tablePath); } private List<String> columns(int[] columnIndexes) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java index ad4297381..aae36b630 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSink.java @@ -19,6 +19,7 @@ package org.apache.fluss.flink.sink; import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.flink.sink.writer.FlinkSinkWriter; +import org.apache.fluss.metadata.TablePath; /** * FlussSink is a specialized Flink sink for writing data to Fluss. @@ -39,8 +40,10 @@ public class FlussSink<InputT> extends FlinkSink<InputT> { * * @param builder the builder used to create the sink writer */ - FlussSink(SinkWriterBuilder<? extends FlinkSinkWriter<InputT>, InputT> builder) { - super(builder); + FlussSink( + SinkWriterBuilder<? extends FlinkSinkWriter<InputT>, InputT> builder, + TablePath tablePath) { + super(builder, tablePath); } /** diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java index 416c3b38a..ad6bb2109 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java @@ -107,6 +107,13 @@ public class FlussSinkBuilder<InputT> { return this; } + /** + * Set the distribution mode for the sink. The distribution mode controls how records are + * shuffled to the Fluss sink operator. + * + * @param distributionMode + * @return + */ public FlussSinkBuilder<InputT> setDistributionMode(DistributionMode distributionMode) { this.distributionMode = distributionMode; return this; @@ -217,7 +224,7 @@ public class FlussSinkBuilder<InputT> { serializationSchema); } - return new FlussSink<>(writerBuilder); + return new FlussSink<>(writerBuilder, tablePath); } private void validateConfiguration() { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java index 2096b1095..533b8d4e3 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java @@ -75,5 +75,8 @@ public interface FlussSerializationSchema<T> extends Serializable { * @return The schema of the input Flink row. */ org.apache.flink.table.types.logical.RowType getInputRowSchema(); + + /** Indicates whether the statistics collection is enabled. */ + boolean isStatisticEnabled(); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java index af79d3ba3..ac80f938c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java @@ -106,9 +106,11 @@ public class RowDataSerializationSchema implements FlussSerializationSchema<RowD */ @Nullable private transient ProjectedRow outputProjection; - private transient RowType rowType; - private transient long fixedSizeInBytes; - private transient List<Integer> variableSizeFields; + /** + * Estimator for calculating the size of RowData instances. Maybe null if there is no need to + * calculate size. + */ + @Nullable private transient RowDataSizeEstimator sizeEstimator; /** * Constructs a new {@code RowSerializationSchema}. @@ -143,10 +145,9 @@ public class RowDataSerializationSchema implements FlussSerializationSchema<RowD } outputProjection = ProjectedRow.from(indexMapping); } - this.rowType = context.getRowSchema(); - Tuple2<Long, List<Integer>> calculateFixedSize = calculateFixedSize(rowType); - this.fixedSizeInBytes = calculateFixedSize.f0; - this.variableSizeFields = calculateFixedSize.f1; + if (context.isStatisticEnabled()) { + this.sizeEstimator = new RowDataSizeEstimator(context.getRowSchema()); + } } /** @@ -174,81 +175,11 @@ public class RowDataSerializationSchema implements FlussSerializationSchema<RowD row = outputProjection.replaceRow(row); } OperationType opType = toOperationType(value.getRowKind()); - long estimatedSizeInBytes = calculateSize(value); - return new RowWithOp(row, opType, estimatedSizeInBytes); - } - - private long calculateSize(RowData value) { - if (value instanceof BinaryFormat) { - return ((BinaryFormat) value).getSizeInBytes(); + Long estimatedSizeInBytes = null; + if (sizeEstimator != null) { + estimatedSizeInBytes = sizeEstimator.estimateSize(value); } - - long size = fixedSizeInBytes; - for (int i : variableSizeFields) { - DataField field = rowType.getFields().get(i); - DataTypeRoot typeRoot = field.getType().getTypeRoot(); - if (value.getArity() <= i || value.isNullAt(i)) { - continue; - } - switch (typeRoot) { - case STRING: - StringData stringData = value.getString(i); - if (stringData instanceof BinaryStringData) { - size += ((BinaryStringData) stringData).getSizeInBytes(); - } else { - size += stringData.toBytes().length; - } - break; - case BYTES: - size += value.getBinary(i).length; - break; - } - } - return size; - } - - private Tuple2<Long, List<Integer>> calculateFixedSize(RowType rowType) { - long size = 0; - List<Integer> variableSizeFields = new ArrayList<>(); - for (int i = 0; i < rowType.getFieldCount(); i++) { - DataField field = rowType.getFields().get(i); - DataTypeRoot typeRoot = field.getType().getTypeRoot(); - switch (typeRoot) { - case CHAR: - size += ((CharType) (field.getType())).getLength(); - break; - case BINARY: - size += ((BinaryType) (field.getType())).getLength(); - break; - case DECIMAL: - size += ((DecimalType) (field.getType())).getPrecision(); - break; - case BOOLEAN: - case TINYINT: - size += 1; - break; - case SMALLINT: - size += 2; - break; - case INTEGER: - case FLOAT: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - size += 4; - break; - case BIGINT: - case DOUBLE: - case TIMESTAMP_WITHOUT_TIME_ZONE: - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - size += 8; - break; - case STRING: - case BYTES: - variableSizeFields.add(i); - break; - } - } - return Tuple2.of(size, variableSizeFields); + return new RowWithOp(row, opType, estimatedSizeInBytes); } /** @@ -281,4 +212,99 @@ public class RowDataSerializationSchema implements FlussSerializationSchema<RowD } } } + + /** Estimator for the size of RowData instances based on their schema and row. */ + private static class RowDataSizeEstimator { + private final RowType rowType; + private final long fixedSizeInBytes; + private final int[] variableSizeFields; + + RowDataSizeEstimator(RowType rowType) { + this.rowType = rowType; + Tuple2<Long, int[]> result = calculateFixedSizeAndVariableColumnIndex(rowType); + this.fixedSizeInBytes = result.f0; + this.variableSizeFields = result.f1; + } + + long estimateSize(RowData value) { + if (value instanceof BinaryFormat) { + return ((BinaryFormat) value).getSizeInBytes(); + } + + long size = fixedSizeInBytes; + for (int i : variableSizeFields) { + DataField field = rowType.getFields().get(i); + DataTypeRoot typeRoot = field.getType().getTypeRoot(); + // handle schema evolution where field may not exist, and null values + if (value.getArity() <= i || value.isNullAt(i)) { + continue; + } + switch (typeRoot) { + case STRING: + StringData stringData = value.getString(i); + if (stringData instanceof BinaryStringData) { + size += ((BinaryStringData) stringData).getSizeInBytes(); + } else { + size += stringData.toBytes().length; + } + break; + case BYTES: + size += value.getBinary(i).length; + break; + } + } + return size; + } + + private static Tuple2<Long, int[]> calculateFixedSizeAndVariableColumnIndex( + RowType rowType) { + long fixedSizeInBytes = 0; + List<Integer> variableSizeFields = new ArrayList<>(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + DataField field = rowType.getFields().get(i); + DataTypeRoot typeRoot = field.getType().getTypeRoot(); + switch (typeRoot) { + case CHAR: + fixedSizeInBytes += ((CharType) (field.getType())).getLength(); + break; + case BINARY: + fixedSizeInBytes += ((BinaryType) (field.getType())).getLength(); + break; + case DECIMAL: + fixedSizeInBytes += ((DecimalType) (field.getType())).getPrecision(); + break; + case BOOLEAN: + case TINYINT: + fixedSizeInBytes += 1; + break; + case SMALLINT: + fixedSizeInBytes += 2; + break; + case INTEGER: + case FLOAT: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + fixedSizeInBytes += 4; + break; + case BIGINT: + case DOUBLE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + fixedSizeInBytes += 8; + break; + case STRING: + case BYTES: + variableSizeFields.add(i); + break; + } + } + + int[] variableColumnIndexes = new int[variableSizeFields.size()]; + for (int i = 0; i < variableSizeFields.size(); i++) { + variableColumnIndexes[i] = variableSizeFields.get(i); + } + + return Tuple2.of(fixedSizeInBytes, variableColumnIndexes); + } + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java index cc1d168cd..1c00a9386 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/SerializerInitContextImpl.java @@ -30,15 +30,19 @@ public class SerializerInitContextImpl implements FlussSerializationSchema.Initi private final RowType flussRowSchema; private final org.apache.flink.table.types.logical.RowType flinkRowType; + private final boolean isStatisticEnabled; - public SerializerInitContextImpl(RowType rowSchema) { - this(rowSchema, toFlinkRowType(rowSchema)); + public SerializerInitContextImpl(RowType rowSchema, boolean isStatisticEnabled) { + this(rowSchema, toFlinkRowType(rowSchema), isStatisticEnabled); } public SerializerInitContextImpl( - RowType rowSchema, org.apache.flink.table.types.logical.RowType flinkRowType) { + RowType rowSchema, + org.apache.flink.table.types.logical.RowType flinkRowType, + boolean isStatisticEnabled) { this.flussRowSchema = checkNotNull(rowSchema, "flussRowSchema"); this.flinkRowType = checkNotNull(flinkRowType, "flinkRowType"); + this.isStatisticEnabled = isStatisticEnabled; } @Override @@ -50,4 +54,9 @@ public class SerializerInitContextImpl implements FlussSerializationSchema.Initi public org.apache.flink.table.types.logical.RowType getInputRowSchema() { return flinkRowType; } + + @Override + public boolean isStatisticEnabled() { + return isStatisticEnabled; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java index 218db829d..da122a7b2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java @@ -247,12 +247,20 @@ class DataStatisticsCoordinator implements OperatorCoordinator { StatisticsEvent.createStatisticsEvent( checkpointId, statistics, statisticsSerializer); for (int i = 0; i < context.currentParallelism(); ++i) { - try { - subtaskGateways.getSubtaskGateway(i).sendEvent(statisticsEvent).get(); - } catch (Exception exception) { - // Ignore future return value for potential error (e.g. subtask down). - LOG.warn("Failed to send global statistics to subtask {}", i, exception); - } + // Ignore future return value for potential error (e.g. subtask down). + // Upon restart, subtasks send request to coordinator to refresh statistics + // if there is any difference + final int subtaskIndex = i; + subtaskGateways + .getSubtaskGateway(subtaskIndex) + .sendEvent(statisticsEvent) + .whenComplete( + (ack, error) -> { + LOG.warn( + "Failed to send global statistics to subtask {}", + subtaskIndex, + error); + }); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsOperator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsOperator.java index cd051fc14..ccf938809 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsOperator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsOperator.java @@ -95,7 +95,8 @@ public class DataStatisticsOperator<InputT> this.partitionGetter = new PartitionGetter(rowType, partitionKeys); this.statisticsSerializer = new DataStatisticsSerializer(); try { - this.flussSerializationSchema.open(new SerializerInitContextImpl(rowType)); + // enable statistics collection for the serialization schema + this.flussSerializationSchema.open(new SerializerInitContextImpl(rowType, true)); } catch (Exception e) { throw new FlussRuntimeException(e); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java index c63c11b84..63883533b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java @@ -19,8 +19,51 @@ package org.apache.fluss.flink.sink.shuffle; /** Distribution mode for sink shuffling. */ public enum DistributionMode { + /** + * Uses Flink's default shuffle strategy: + * + * <p>Typically uses FORWARD when the sink parallelism matches the upstream parallelism Uses + * ROUND_ROBIN when parallelisms differ. + */ NONE, + + /** + * Automatically chooses the best mode based on the table type: + * + * <p>Uses BUCKET mode for Primary Key Tables and Log Tables with bucket keys maximize + * throughput. + * + * <p>Uses NONE for Log Tables without bucket keys. + */ AUTO, + + /** + * Shuffle data by bucket ID before writing to sink. This groups data with the same bucket ID to + * be processed by the same task, which improves client processing efficiency and reduces + * resource consumption. + * + * <p>Characteristics: + * + * <p>Particularly recommended for Primary Key tables as it can significantly improve + * throughput. For Log Tables, bucket shuffle only takes effect when the 'bucket.key' is + * defined. + * + * <p>Note: When sink parallelism exceeds the number of buckets, some sink tasks may remain idle + * without receiving data. + */ BUCKET, + + /** + * Dynamically adjusts shuffle strategy based on partition key traffic patterns. This mode + * monitors data distribution and adjusts the shuffle behavior to balance the load. + * + * <p>Characteristics: + * + * <p>Only supported for partitioned Log Tables (not supported for Primary Key tables now). Use + * this mode when data is highly skewed across partitions or when there are many partitions. + * + * <p>Note: This mode has overhead costs including data statistics collection and additional + * shuffle operations. + */ PARTITION_DYNAMIC } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java index 32b7a4094..692d5b28d 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java @@ -99,7 +99,7 @@ public class StatisticsOrRecordChannelComputer<InputT> this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys, lakeFormat); this.partitionGetter = new PartitionGetter(flussRowType, partitionKeys); try { - this.serializationSchema.open(new SerializerInitContextImpl(flussRowType)); + this.serializationSchema.open(new SerializerInitContextImpl(flussRowType, false)); } catch (Exception e) { throw new FlussRuntimeException(e); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java index fe0eb28ea..f942bc5ad 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/writer/FlinkSinkWriter.java @@ -117,7 +117,8 @@ public abstract class FlinkSinkWriter<InputT> implements SinkWriter<InputT> { try { this.serializationSchema.open( - new SerializerInitContextImpl(table.getTableInfo().getRowType(), tableRowType)); + new SerializerInitContextImpl( + table.getTableInfo().getRowType(), tableRowType, false)); } catch (Exception e) { throw new FlussRuntimeException(e); } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java index 41bda51a8..596ad7d1d 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java @@ -42,7 +42,6 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -166,7 +165,6 @@ abstract class FlinkMetricsITCase { createTable(tablePath, tableDescriptor); // test write - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); TableResult tableResult = tEnv.executeSql("insert into test values (1, 'name1'), (2, 'name2'), (3, 'name3')"); JobClient client = tableResult.getJobClient().get(); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputerTest.java index 84a01c020..93d5254fd 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkRowDataChannelComputerTest.java @@ -40,7 +40,7 @@ class FlinkRowDataChannelComputerTest { @BeforeAll static void init() throws Exception { - serializationSchema.open(new SerializerInitContextImpl(DATA1_ROW_TYPE)); + serializationSchema.open(new SerializerInitContextImpl(DATA1_ROW_TYPE, false)); } @Test diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java index 6f1c65ecd..1af582e4d 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java @@ -102,9 +102,7 @@ abstract class FlinkTableSinkITCase extends AbstractTestBase { // open a catalog so that we can get table from the catalog String bootstrapServers = FLUSS_CLUSTER_EXTENSION.getBootstrapServers(); // create table environment - org.apache.flink.configuration.Configuration config = - new org.apache.flink.configuration.Configuration(); - env = StreamExecutionEnvironment.getExecutionEnvironment(config); + env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); tEnv = StreamTableEnvironment.create(env); @@ -240,15 +238,17 @@ abstract class FlinkTableSinkITCase extends AbstractTestBase { if (distributionMode == DistributionMode.PARTITION_DYNAMIC) { assertThatThrownBy(() -> tEnv.executeSql(insertSql)) .hasMessageContaining( - "PARTITION_DYNAMIC is only supported for partition tables"); + "PARTITION_DYNAMIC is only supported for partitioned tables"); return; } String insertPlan = tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN); if (distributionMode == DistributionMode.BUCKET) { - assertThat(insertPlan).contains("\"ship_strategy\" : \"BUCKET_SHUFFLE\""); + assertThat(insertPlan).contains("\"ship_strategy\" : \"BUCKET\""); } else { assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\""); } + // there shouldn't have REBALANCE shuffle strategy, this asserts operator parallelism + assertThat(insertPlan).doesNotContain("\"ship_strategy\" : \"REBALANCE\""); tEnv.executeSql(insertSql).await(); CloseableIterator<Row> rowIter = tEnv.executeSql("select * from sink_test").collect(); @@ -272,8 +272,8 @@ abstract class FlinkTableSinkITCase extends AbstractTestBase { List<String> actual = collectRowsWithTimeout(rowIter, expectedRows.size()); assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRows); + // check data with the same bucket key should be read in sequence. if (distributionMode == DistributionMode.BUCKET) { - // check data with the same bucket key should be read in sequence. for (List<String> expected : expectedGroups) { if (expected.size() <= 1) { continue; @@ -290,7 +290,6 @@ abstract class FlinkTableSinkITCase extends AbstractTestBase { @Test void testAppendLogWithRoundRobin() throws Exception { - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); tEnv.executeSql( "create table sink_test (a int not null, b bigint, c string) with " + "('bucket.num' = '3', 'client.writer.bucket.no-key-assigner' = 'round_robin')"); @@ -387,18 +386,23 @@ abstract class FlinkTableSinkITCase extends AbstractTestBase { + "(10, 3510, 'coco'), " + "(11, 3511, 'stave'), " + "(12, 3512, 'Tim')"; - if (distributionMode == DistributionMode.PARTITION_DYNAMIC) { - assertThat(tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN)) - .contains(String.format("\"ship_strategy\" : \"%s\"", distributionMode.name())); - } else if (distributionMode == DistributionMode.BUCKET) { + + if (distributionMode == DistributionMode.BUCKET) { assertThatThrownBy(() -> tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN)) .hasMessageContaining( "BUCKET mode is only supported for log tables with bucket keys"); return; + } + + String insertPlan = tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN); + if (distributionMode == DistributionMode.PARTITION_DYNAMIC) { + assertThat(insertPlan) + .contains(String.format("\"ship_strategy\" : \"%s\"", distributionMode.name())); } else { - assertThat(tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN)) - .contains("\"ship_strategy\" : \"FORWARD\""); + assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\""); } + assertThat(insertPlan).doesNotContain("\"ship_strategy\" : \"REBALANCE\""); + tEnv.executeSql(insertSql).await(); CloseableIterator<Row> rowIter = tEnv.executeSql("select * from sink_test").collect(); @@ -449,11 +453,12 @@ abstract class FlinkTableSinkITCase extends AbstractTestBase { String insertPlan = tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN); if (distributionMode == DistributionMode.BUCKET) { - assertThat(insertPlan).contains("\"ship_strategy\" : \"BUCKET_SHUFFLE\""); + assertThat(insertPlan).contains("\"ship_strategy\" : \"BUCKET\""); } else if (distributionMode == DistributionMode.AUTO || distributionMode == DistributionMode.NONE) { assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\""); } + assertThat(insertPlan).doesNotContain("\"ship_strategy\" : \"REBALANCE\""); tEnv.executeSql(insertSql).await(); @@ -625,7 +630,6 @@ abstract class FlinkTableSinkITCase extends AbstractTestBase { @Test void testFirstRowMergeEngine() throws Exception { - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); tEnv.executeSql( "create table first_row_source (a int not null primary key not enforced," + " b string) with('table.merge-engine' = 'first_row')"); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java index bebc233ce..b88aa4dae 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java @@ -144,7 +144,7 @@ public class FlussSinkITCase extends FlinkTestBase { .setSerializationSchema(serializationSchema) .build(); - flussSink.addPreWriteTopology(stream).name("Fluss Sink"); + flussSink.apply(stream).name("Fluss Sink"); env.executeAsync("Test RowData Fluss Sink"); @@ -272,7 +272,7 @@ public class FlussSinkITCase extends FlinkTestBase { .setSerializationSchema(new TestOrderSerializationSchema()) .build(); - flussSink.addPreWriteTopology(stream).name("Fluss Sink"); + flussSink.apply(stream).name("Fluss Sink"); env.executeAsync("Test Order Fluss Sink"); Table table = conn.getTable(new TablePath(DEFAULT_DB, pkTableName)); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputerTest.java index b16f90244..c63690951 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputerTest.java @@ -50,7 +50,7 @@ class StatisticsOrRecordChannelComputerTest { @BeforeAll static void init() throws Exception { - serializationSchema.open(new SerializerInitContextImpl(DATA1_ROW_TYPE)); + serializationSchema.open(new SerializerInitContextImpl(DATA1_ROW_TYPE, false)); } @Test diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index f0f4995fb..d5efb0aaf 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -130,8 +130,8 @@ See more details about [ALTER TABLE ... SET](engine-flink/ddl.md#set-properties) | Option | Type | Default | Description [...] |-----------------------------------------------------|------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] | sink.ignore-delete | Boolean | false | If set to true, the sink will ignore DELETE and UPDATE_BEFORE changelog events. [...] -| sink.bucket-shuffle | Boolean | true | Whether to shuffle by bucket id before write to sink. Shuffling the data with the same bucket id to be processed by the same task can improve the efficiency of client processing and reduce resource consumption. For Log Table, bucket shuffle will only take effect when the 'bucket.key' is defined. For Primary Key table, it is enabled by default. This option is deprecated. Please use sink.distribution-m [...] -| sink.distribution-mode | Enum | AUTO | Defines the distribution mode for writing data to the sink. See [Distribution Modes](#distribution-modes) for details. | +| sink.bucket-shuffle | Boolean | true | Whether to shuffle by bucket id before write to sink. Shuffling the data with the same bucket id to be processed by the same task can improve the efficiency of client processing and reduce resource consumption. For Log Table, bucket shuffle will only take effect when the 'bucket.key' is defined. For Primary Key table, it is enabled by default. This option is deprecated. Please use `sink.distribution- [...] +| sink.distribution-mode | Enum | AUTO | Defines the distribution mode for shuffling data to the sink. Available options are `AUTO`, `NONE`, `BUCKET`, and `PARTITION_DYNAMIC`. See [Distribution Modes](#distribution-modes) for details about each option. | | client.writer.buffer.memory-size | MemorySize | 64mb | The total bytes of memory the writer can use to buffer internal rows. [...] | client.writer.buffer.page-size | MemorySize | 128kb | Size of every page in memory buffers (`client.writer.buffer.memory-size`). [...] | client.writer.buffer.per-request-memory-size | MemorySize | 16mb | The minimum number of bytes that will be allocated by the writer rounded down to the closest multiple of client.writer.buffer.page-size. It must be greater than or equal to client.writer.buffer.page-size. This option allows to allocate memory in batches to have better CPU-cached friendliness due to contiguous segments. [...] @@ -173,8 +173,8 @@ Shuffle data by bucket ID before writing to sink. This groups data with the same Dynamically adjusts shuffle strategy based on partition key traffic patterns. This mode monitors data distribution and adjusts the shuffle behavior to balance the load. **Characteristics:** -- Only supported for partitioned Log Tables (not for Primary Key tables now) -- Use this mode when data is highly skewed across partitions or when there are比较多 partitions +- Only supported for partitioned Log Tables (not supported for Primary Key tables now) +- Use this mode when data is highly skewed across partitions or when there are many partitions - **Note:** This mode has overhead costs including data statistics collection and additional shuffle operations ## Other Options
