Copilot commented on code in PR #1784:
URL: https://github.com/apache/fluss/pull/1784#discussion_r2648274494
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java:
##########
@@ -159,6 +168,66 @@ public RowWithOp serialize(RowData value) throws Exception
{
return new RowWithOp(row, opType);
}
+ @Override
+ public long size(RowData value, RowType rowType) {
+ if (value instanceof BinaryFormat) {
+ return ((BinaryFormat) value).getSizeInBytes();
+ }
+
+ long size = 0;
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ DataField field = rowType.getFields().get(i);
+ DataTypeRoot typeRoot = field.getType().getTypeRoot();
+ if (value.isNullAt(i)) {
+ continue;
+ }
+ switch (typeRoot) {
+ case CHAR:
+ size += ((CharType) (field.getType())).getLength();
+ break;
+ case STRING:
+ StringData stringData = value.getString(i);
+ if (stringData instanceof BinaryStringData) {
+ size += ((BinaryStringData)
stringData).getSizeInBytes();
+ } else {
+ size += converter.getString(i).getSizeInBytes();
+ }
+ break;
+ case BINARY:
+ size += ((BinaryType) (field.getType())).getLength();
+ break;
+ case BYTES:
+ size += converter.getBytes(i).length;
+ break;
+ case DECIMAL:
+ size += ((DecimalType) (field.getType())).getPrecision();
+ break;
+ case BOOLEAN:
+ case TINYINT:
+ size += 1;
+ break;
+ case SMALLINT:
+ size += 2;
+ break;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ size += 4;
+ break;
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ size += 8;
Review Comment:
The size calculation may be incorrect for FLOAT type. Line 218-222 assigns 8
bytes to FLOAT, but FLOAT typically only requires 4 bytes. DOUBLE requires 8
bytes. This could lead to incorrect statistics and weight calculations in the
dynamic shuffle algorithm.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java:
##########
@@ -188,6 +194,23 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context
context) {
}
FlinkSink<RowData> flinkSink = getFlinkSink(targetColumnIndexes);
+ if (distributionMode == DistributionMode.DYNAMIC_SHUFFLE) {
+ return new DataStreamSinkProvider() {
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext, DataStream<RowData>
dataStream) {
+ // todo: add prefix to the sink to avoid conflict in same
flink job.
+ String defaultSuffix =
+ "sink"
Review Comment:
TODO comment should be resolved or converted to a proper issue/ticket. The
comment indicates the need for a proper prefix to avoid conflicts, which is an
important consideration for production code.
```suggestion
// Use a connector-specific prefix to reduce the risk of
sink name/UID conflicts.
String defaultSuffix =
"fluss-sink"
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatistics.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/** Data statistics for a partition name and its frequency. */
+@Internal
+public class DataStatistics {
Review Comment:
Class DataStatistics overrides [hashCode](1) but not equals.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java:
##########
@@ -92,9 +95,19 @@ public FlussSinkBuilder<InputT> setTable(String table) {
return this;
}
- /** Set shuffle by bucket id. */
+ /**
+ * Set shuffle by bucket id. Deprecated use {@link
+ * FlussSinkBuilder#setShuffleMode(DistributionMode) } instead.
+ */
+ @Deprecated
public FlussSinkBuilder<InputT> setShuffleByBucketId(boolean
shuffleByBucketId) {
- this.shuffleByBucketId = shuffleByBucketId;
+ this.shuffleMode =
+ shuffleByBucketId ? DistributionMode.BUCKET_SHUFFLE :
DistributionMode.NONE;
+ return this;
+ }
+
+ public FlussSinkBuilder<InputT> setShuffleMode(DistributionMode
shuffleByBucketId) {
+ this.shuffleMode = shuffleByBucketId;
Review Comment:
The parameter name 'shuffleByBucketId' is misleading. The parameter should
be named 'distributionMode' or 'shuffleMode' to accurately reflect that it
accepts a DistributionMode enum value, not a boolean.
```suggestion
public FlussSinkBuilder<InputT> setShuffleMode(DistributionMode
shuffleMode) {
this.shuffleMode = shuffleMode;
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/DataStatisticOperatorTest.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.flink.sink.serializer.RowDataSerializationSchema;
+
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.fluss.flink.sink.shuffle.StatisticsEvent.createStatisticsEvent;
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DataStatisticsOperator}. */
+public class DataStatisticOperatorTest {
+
+ @Test
+ void testProcessElement() throws Exception {
+ DataStatisticsOperatorFactory<RowData> factory =
+ new DataStatisticsOperatorFactory<>(
+ DATA1_ROW_TYPE,
+ Collections.singletonList("b"),
+ new RowDataSerializationSchema(false, false));
+ List<StreamRecord<RowData>> inputRecords =
+ Arrays.asList(
+ new StreamRecord<>(GenericRowData.of(1,
StringData.fromString("a"))),
+ new StreamRecord<>(GenericRowData.of(2,
StringData.fromString("a"))),
+ new StreamRecord<>(GenericRowData.of(3,
StringData.fromString("b"))));
+
+ List<StreamRecord<StatisticsOrRecord<RowData>>> expectedOutput = new
ArrayList<>();
+
+ try (DataStatisticOperatorTestHarness testHarness =
+ new DataStatisticOperatorTestHarness(factory, 1, 1, 0)) {
+ testHarness.open();
+ assertThat(testHarness.getLocalStatistics()).isEmpty();
+
+ // process five records
Review Comment:
The test comment says "process five records" (line 64) but only three
records are being processed in the loop. The comment should be corrected to say
"process three records".
```suggestion
// process three records
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/WeightedRandomAssignment.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.row.InternalRow;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Partition assignment strategy that randomly distributes records to subtasks
based on configured
+ * weights.
+ *
+ * <p>This assignment strategy enables weighted random distribution of records
to subtasks, allowing
+ * for more balanced load distribution across downstream subtasks. The
assignment uses a weighted
+ * random algorithm where subtasks with higher weights have a proportionally
higher probability of
+ * being selected.
+ *
+ * <p>NOTE: This class is inspired from Iceberg project.
+ */
+@Internal
+public class WeightedRandomAssignment implements PartitionAssignment {
Review Comment:
Class WeightedRandomAssignment overrides [hashCode](1) but not equals.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+/**
+ * The type information for StatisticsOrRecord.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecordTypeInformation<InputT>
+ extends TypeInformation<StatisticsOrRecord<InputT>> {
+
+ private final TypeInformation<InputT> rowTypeInformation;
+ private final DataStatisticsSerializer globalStatisticsSerializer;
+
+ public StatisticsOrRecordTypeInformation(TypeInformation<InputT>
rowTypeInformation) {
+ this.rowTypeInformation = rowTypeInformation;
+ this.globalStatisticsSerializer = new DataStatisticsSerializer();
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Class<StatisticsOrRecord<InputT>> getTypeClass() {
+ return (Class<StatisticsOrRecord<InputT>>) (Class<?>)
StatisticsOrRecord.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<StatisticsOrRecord<InputT>>
createSerializer(SerializerConfig config) {
+ TypeSerializer<InputT> recordSerializer =
rowTypeInformation.createSerializer(config);
+ return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer,
recordSerializer);
+ }
+
+ @Override
+ @Deprecated
+ public TypeSerializer<StatisticsOrRecord<InputT>>
createSerializer(ExecutionConfig config) {
+ TypeSerializer<InputT> recordSerializer =
rowTypeInformation.createSerializer(config);
+ return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer,
recordSerializer);
+ }
+
+ @Override
+ public String toString() {
+ return "StatisticsOrRecord";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o != null && this.getClass() == o.getClass()) {
+ StatisticsOrRecordTypeInformation<InputT> that =
+ (StatisticsOrRecordTypeInformation<InputT>) o;
+ return that.rowTypeInformation.equals(rowTypeInformation)
+ &&
that.globalStatisticsSerializer.equals(globalStatisticsSerializer);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rowTypeInformation, globalStatisticsSerializer);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
Review Comment:
Method StatisticsOrRecordTypeInformation.canEqual(..) could be confused with
overloaded method [TypeInformation.canEqual](1), since dispatch depends on
static types.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordSerializer.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * The serializer for {@link StatisticsOrRecord}.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecordSerializer<InputT>
+ extends TypeSerializer<StatisticsOrRecord<InputT>> {
+ private final TypeSerializer<DataStatistics> statisticsSerializer;
+ private final TypeSerializer<InputT> recordSerializer;
+
+ StatisticsOrRecordSerializer(
+ TypeSerializer<DataStatistics> statisticsSerializer,
+ TypeSerializer<InputT> recordSerializer) {
+ this.statisticsSerializer = statisticsSerializer;
+ this.recordSerializer = recordSerializer;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @SuppressWarnings("ReferenceEquality")
+ @Override
+ public TypeSerializer<StatisticsOrRecord<InputT>> duplicate() {
+ TypeSerializer<DataStatistics> duplicateStatisticsSerializer =
+ statisticsSerializer.duplicate();
+ TypeSerializer<InputT> duplicateRowDataSerializer =
recordSerializer.duplicate();
+ if ((statisticsSerializer != duplicateStatisticsSerializer)
+ || (recordSerializer != duplicateRowDataSerializer)) {
+ return new StatisticsOrRecordSerializer<>(
+ duplicateStatisticsSerializer, duplicateRowDataSerializer);
+ } else {
+ return this;
+ }
+ }
+
+ @Override
+ public StatisticsOrRecord<InputT> createInstance() {
+ // arbitrarily always create RowData value instance
+ return
StatisticsOrRecord.fromRecord(recordSerializer.createInstance());
+ }
+
+ @Override
+ public StatisticsOrRecord<InputT> copy(StatisticsOrRecord<InputT> from) {
+ if (from.hasRecord()) {
+ return
StatisticsOrRecord.fromRecord(recordSerializer.copy(from.record()));
+ } else {
+ return
StatisticsOrRecord.fromStatistics(statisticsSerializer.copy(from.statistics()));
+ }
+ }
+
+ @Override
+ public StatisticsOrRecord<InputT> copy(
+ StatisticsOrRecord<InputT> from, StatisticsOrRecord<InputT> reuse)
{
+ StatisticsOrRecord<InputT> to;
+ if (from.hasRecord()) {
+ to = StatisticsOrRecord.reuseRecord(reuse, recordSerializer);
+ InputT record = recordSerializer.copy(from.record(), to.record());
+ to.record(record);
+ } else {
+ to = StatisticsOrRecord.reuseStatistics(reuse,
statisticsSerializer);
+ DataStatistics statistics =
+ statisticsSerializer.copy(from.statistics(),
to.statistics());
+ to.statistics(statistics);
+ }
+
+ return to;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(StatisticsOrRecord<InputT> statisticsOrRecord,
DataOutputView target)
+ throws IOException {
+ if (statisticsOrRecord.hasRecord()) {
+ target.writeBoolean(true);
+ recordSerializer.serialize(statisticsOrRecord.record(), target);
+ } else {
+ target.writeBoolean(false);
+ statisticsSerializer.serialize(statisticsOrRecord.statistics(),
target);
+ }
+ }
+
+ @Override
+ public StatisticsOrRecord<InputT> deserialize(DataInputView source) throws
IOException {
+ boolean isRecord = source.readBoolean();
+ if (isRecord) {
+ return
StatisticsOrRecord.fromRecord(recordSerializer.deserialize(source));
+ } else {
+ return
StatisticsOrRecord.fromStatistics(statisticsSerializer.deserialize(source));
+ }
+ }
+
+ @Override
+ public StatisticsOrRecord<InputT> deserialize(
+ StatisticsOrRecord<InputT> reuse, DataInputView source) throws
IOException {
+ StatisticsOrRecord<InputT> to;
+ boolean isRecord = source.readBoolean();
+ if (isRecord) {
+ to = StatisticsOrRecord.reuseRecord(reuse, recordSerializer);
+ InputT record = recordSerializer.deserialize(to.record(), source);
+ to.record(record);
+ } else {
+ to = StatisticsOrRecord.reuseStatistics(reuse,
statisticsSerializer);
+ DataStatistics statistics =
statisticsSerializer.deserialize(to.statistics(), source);
+ to.statistics(statistics);
+ }
+
+ return to;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ boolean hasRecord = source.readBoolean();
+ target.writeBoolean(hasRecord);
+ if (hasRecord) {
+ recordSerializer.copy(source, target);
+ } else {
+ statisticsSerializer.copy(source, target);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
Review Comment:
Method StatisticsOrRecordSerializer.equals(..) could be confused with
overloaded method [TypeSerializer.equals](1), since dispatch depends on static
types.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsSerializer.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Map;
+
+/** Serializer for {@link DataStatistics}. */
+@Internal
+public class DataStatisticsSerializer extends TypeSerializer<DataStatistics> {
Review Comment:
Class DataStatisticsSerializer overrides [hashCode](1) but not equals.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -133,20 +144,76 @@ public AppendSinkWriter<InputT>
createWriter(MailboxExecutor mailboxExecutor) {
@Override
public DataStream<InputT> addPreWriteTopology(DataStream<InputT>
input) {
- // For append only sink, we will do bucket shuffle only if bucket
keys are not empty.
- if (!bucketKeys.isEmpty() && shuffleByBucketId) {
- return partition(
- input,
- new FlinkRowDataChannelComputer<>(
- toFlussRowType(tableRowType),
- bucketKeys,
- partitionKeys,
- lakeFormat,
- numBucket,
- flussSerializationSchema),
- input.getParallelism());
- } else {
- return input;
+ switch (shuffleMode) {
+ case BUCKET_SHUFFLE:
+ if (!bucketKeys.isEmpty()) {
+ return partition(
+ input,
+ new FlinkRowDataChannelComputer<>(
+ toFlussRowType(tableRowType),
+ bucketKeys,
+ partitionKeys,
+ lakeFormat,
+ numBucket,
+ flussSerializationSchema),
+ input.getParallelism());
+ }
+ return input;
+ case NONE:
+ return input;
+ case DYNAMIC_SHUFFLE:
+ if (partitionKeys.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "DYNAMIC_SHUFFLE is only supported for
partition tables");
+ }
+
+ if (rowTypeInformation == null) {
+ throw new UnsupportedOperationException(
+ "RowTypeInformation is required for
DYNAMIC_SHUFFLE mode.");
+ }
+ TypeInformation<StatisticsOrRecord<InputT>>
statisticsOrRecordTypeInformation =
+ new
StatisticsOrRecordTypeInformation<>(rowTypeInformation);
+ SingleOutputStreamOperator<StatisticsOrRecord<InputT>>
shuffleStream =
+ input.transform(
+ "Range shuffle Collector",
+ statisticsOrRecordTypeInformation,
+ new
DataStatisticsOperatorFactory<>(
+
toFlussRowType(tableRowType),
+ partitionKeys,
+ flussSerializationSchema))
+ .uid("Range shuffle Collector" + tablePath)
Review Comment:
The naming "Range shuffle Collector" is inconsistent with "DYNAMIC_SHUFFLE"
terminology used throughout the codebase. The operator name should use "Dynamic
shuffle" or "Data statistics" to match the feature name and maintain
consistency.
```suggestion
"Dynamic shuffle data
statistics",
statisticsOrRecordTypeInformation,
new
DataStatisticsOperatorFactory<>(
toFlussRowType(tableRowType),
partitionKeys,
flussSerializationSchema))
.uid("Dynamic shuffle data statistics" +
tablePath)
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecord.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
+/**
+ * Either a record or a statistics.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecord<InputT> {
Review Comment:
Class StatisticsOrRecord overrides [hashCode](1) but not equals.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java:
##########
@@ -159,6 +168,66 @@ public RowWithOp serialize(RowData value) throws Exception
{
return new RowWithOp(row, opType);
}
+ @Override
+ public long size(RowData value, RowType rowType) {
+ if (value instanceof BinaryFormat) {
+ return ((BinaryFormat) value).getSizeInBytes();
+ }
+
+ long size = 0;
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ DataField field = rowType.getFields().get(i);
+ DataTypeRoot typeRoot = field.getType().getTypeRoot();
+ if (value.isNullAt(i)) {
+ continue;
+ }
+ switch (typeRoot) {
+ case CHAR:
+ size += ((CharType) (field.getType())).getLength();
+ break;
+ case STRING:
+ StringData stringData = value.getString(i);
+ if (stringData instanceof BinaryStringData) {
+ size += ((BinaryStringData)
stringData).getSizeInBytes();
+ } else {
+ size += converter.getString(i).getSizeInBytes();
+ }
+ break;
+ case BINARY:
+ size += ((BinaryType) (field.getType())).getLength();
+ break;
+ case BYTES:
+ size += converter.getBytes(i).length;
+ break;
+ case DECIMAL:
+ size += ((DecimalType) (field.getType())).getPrecision();
Review Comment:
The DECIMAL type size calculation is potentially incorrect. Line 203 uses
precision as the size, but the actual size should be calculated based on the
storage format (which depends on both precision and scale). For example, a
DECIMAL(5,2) and DECIMAL(38,18) should have different sizes, but using
precision alone doesn't accurately reflect the actual storage size.
```suggestion
DecimalType decimalType = (DecimalType) field.getType();
int precision = decimalType.getPrecision();
int scale = decimalType.getScale();
int integerDigits = Math.max(1, precision - scale);
int fractionalDigits = Math.max(0, scale);
int decimalPoint = fractionalDigits > 0 ? 1 : 0;
int sign = 1; // allow space for a potential '-' sign
size += integerDigits + fractionalDigits + decimalPoint
+ sign;
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+/**
+ * The type information for StatisticsOrRecord.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecordTypeInformation<InputT>
Review Comment:
Class StatisticsOrRecordTypeInformation overrides [hashCode](1) but not
equals.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsEvent.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/** Event to send statistics to the coordinator. */
+@Internal
+class StatisticsEvent implements OperatorEvent {
Review Comment:
Class StatisticsEvent overrides [hashCode](1) but not equals.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordSerializer.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * The serializer for {@link StatisticsOrRecord}.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecordSerializer<InputT>
Review Comment:
Class StatisticsOrRecordSerializer overrides [hashCode](1) but not equals.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java:
##########
@@ -126,7 +145,9 @@ public FlussSink<InputT> build() {
FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter<InputT>, InputT>
writerBuilder;
TablePath tablePath = new TablePath(database, tableName);
- flussConfig.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(),
bootstrapServers);
+ if (bootstrapServers != null) {
+ flussConfig.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(),
bootstrapServers);
+ }
Review Comment:
The validation logic allows bootstrapServers to be null (line 148 checks if
not null before setting), but line 209 in validateConfiguration() requires it
to be non-null. This creates an inconsistency where the build() method may
proceed with a null bootstrapServers value, only to fail during validation. The
conditional check at line 148 should either be removed or the validation should
be updated to match this conditional behavior.
```suggestion
flussConfig.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(),
bootstrapServers);
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsSerializer.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Map;
+
+/** Serializer for {@link DataStatistics}. */
+@Internal
+public class DataStatisticsSerializer extends TypeSerializer<DataStatistics> {
+ private final MapSerializer<String, Long> mapSerializer;
+
+ public DataStatisticsSerializer() {
+ this.mapSerializer =
+ new MapSerializer<>(StringSerializer.INSTANCE,
LongSerializer.INSTANCE);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<DataStatistics> duplicate() {
+ return new DataStatisticsSerializer();
+ }
+
+ @Override
+ public DataStatistics createInstance() {
+ return new DataStatistics();
+ }
+
+ @Override
+ public DataStatistics copy(DataStatistics from) {
+ return new DataStatistics(from.result());
+ }
+
+ @Override
+ public DataStatistics copy(DataStatistics from, DataStatistics reuse) {
+ // no benefit of reuse
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(DataStatistics record, DataOutputView target) throws
IOException {
+ mapSerializer.serialize(record.result(), target);
+ }
+
+ @Override
+ public DataStatistics deserialize(DataInputView source) throws IOException
{
+ Map<String, Long> partitionFrequency =
mapSerializer.deserialize(source);
+ return new DataStatistics(partitionFrequency);
+ }
+
+ @Override
+ public DataStatistics deserialize(DataStatistics reuse, DataInputView
source)
+ throws IOException {
+ // not much benefit to reuse
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ serialize(deserialize(source), target);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
Review Comment:
Method DataStatisticsSerializer.equals(..) could be confused with overloaded
method [TypeSerializer.equals](1), since dispatch depends on static types.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Objects;
+
+/**
+ * The type information for StatisticsOrRecord.
+ *
+ * @param <InputT>
+ */
+@Internal
+public class StatisticsOrRecordTypeInformation<InputT>
+ extends TypeInformation<StatisticsOrRecord<InputT>> {
+
+ private final TypeInformation<InputT> rowTypeInformation;
+ private final DataStatisticsSerializer globalStatisticsSerializer;
+
+ public StatisticsOrRecordTypeInformation(TypeInformation<InputT>
rowTypeInformation) {
+ this.rowTypeInformation = rowTypeInformation;
+ this.globalStatisticsSerializer = new DataStatisticsSerializer();
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Class<StatisticsOrRecord<InputT>> getTypeClass() {
+ return (Class<StatisticsOrRecord<InputT>>) (Class<?>)
StatisticsOrRecord.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<StatisticsOrRecord<InputT>>
createSerializer(SerializerConfig config) {
+ TypeSerializer<InputT> recordSerializer =
rowTypeInformation.createSerializer(config);
+ return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer,
recordSerializer);
+ }
+
+ @Override
+ @Deprecated
+ public TypeSerializer<StatisticsOrRecord<InputT>>
createSerializer(ExecutionConfig config) {
+ TypeSerializer<InputT> recordSerializer =
rowTypeInformation.createSerializer(config);
+ return new StatisticsOrRecordSerializer<>(globalStatisticsSerializer,
recordSerializer);
+ }
+
+ @Override
+ public String toString() {
+ return "StatisticsOrRecord";
+ }
+
+ @Override
+ public boolean equals(Object o) {
Review Comment:
Method StatisticsOrRecordTypeInformation.equals(..) could be confused with
overloaded method [TypeInformation.equals](1), since dispatch depends on static
types.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.bucketing.BucketingFunction;
+import org.apache.fluss.client.table.getter.PartitionGetter;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.flink.row.RowWithOp;
+import org.apache.fluss.flink.sink.ChannelComputer;
+import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema;
+import org.apache.fluss.flink.sink.serializer.SerializerInitContextImpl;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.encode.KeyEncoder;
+import org.apache.fluss.types.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * {@link ChannelComputer} for {@link StatisticsOrRecord} which will change
shuffle based on
+ * partition statistic.
+ */
+@Internal
+public class StatisticsOrRecordChannelComputer<InputT>
+ implements ChannelComputer<StatisticsOrRecord<InputT>> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StatisticsOrRecordChannelComputer.class);
+
+ private final @Nullable DataLakeFormat lakeFormat;
+ private final RowType flussRowType;
+ private final List<String> bucketKeys;
+ private final List<String> partitionKeys;
+ private final FlussSerializationSchema<InputT> serializationSchema;
+ private final int bucketNum;
+
+ private transient int downstreamNumChannels;
+ private transient KeyEncoder bucketKeyEncoder;
+ private transient PartitionGetter partitionGetter;
+ private transient MapPartitioner delegatePartitioner;
+ private transient AtomicLong roundRobinCounter;
+ private transient BucketingFunction bucketingFunction;
+ private transient Random random;
+
+ public StatisticsOrRecordChannelComputer(
+ RowType flussRowType,
+ List<String> bucketKeys,
+ List<String> partitionKeys,
+ int bucketNum,
+ @Nullable DataLakeFormat lakeFormat,
+ FlussSerializationSchema<InputT> serializationSchema) {
+ checkArgument(
+ partitionKeys != null && !partitionKeys.isEmpty(),
+ "Partition keys cannot be empty.");
+ this.flussRowType = flussRowType;
+ this.bucketKeys = bucketKeys;
+ this.partitionKeys = partitionKeys;
+ this.bucketNum = bucketNum;
+ this.lakeFormat = lakeFormat;
+ this.serializationSchema = serializationSchema;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ LOG.info("Setting up with {} downstream channels", numChannels);
+ this.downstreamNumChannels = numChannels;
+ this.bucketingFunction = BucketingFunction.of(lakeFormat);
+ this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys,
lakeFormat);
+ this.partitionGetter = new PartitionGetter(flussRowType,
partitionKeys);
+ try {
+ this.serializationSchema.open(new
SerializerInitContextImpl(flussRowType));
+ } catch (Exception e) {
+ throw new FlussRuntimeException(e);
+ }
+ this.random = ThreadLocalRandom.current();
+ }
+
+ @Override
+ public int channel(StatisticsOrRecord<InputT> wrapper) {
Review Comment:
Variable [wrapper](1) may be null at this access as suggested by [this](2)
null guard.
```suggestion
public int channel(StatisticsOrRecord<InputT> wrapper) {
if (wrapper == null) {
throw new FlussRuntimeException("StatisticsOrRecord wrapper must
not be null");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]