wuchong commented on code in PR #1784: URL: https://github.com/apache/fluss/pull/1784#discussion_r2685419785
########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java: ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.bucketing.BucketingFunction; +import org.apache.fluss.client.table.getter.PartitionGetter; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.flink.row.RowWithOp; +import org.apache.fluss.flink.sink.ChannelComputer; +import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema; +import org.apache.fluss.flink.sink.serializer.SerializerInitContextImpl; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.encode.KeyEncoder; +import org.apache.fluss.types.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * {@link ChannelComputer} for {@link StatisticsOrRecord} which will change shuffle based on + * partition statistic. + */ +@Internal +public class StatisticsOrRecordChannelComputer<InputT> + implements ChannelComputer<StatisticsOrRecord<InputT>> { + private static final Logger LOG = + LoggerFactory.getLogger(StatisticsOrRecordChannelComputer.class); + + private final @Nullable DataLakeFormat lakeFormat; + private final RowType flussRowType; + private final List<String> bucketKeys; + private final List<String> partitionKeys; + private final FlussSerializationSchema<InputT> serializationSchema; + private final int bucketNum; + + private transient int downstreamNumChannels; + private transient KeyEncoder bucketKeyEncoder; + private transient PartitionGetter partitionGetter; + private transient MapPartitioner delegatePartitioner; + private transient AtomicLong roundRobinCounter; + private transient BucketingFunction bucketingFunction; + private transient Random random; + + public StatisticsOrRecordChannelComputer( + RowType flussRowType, + List<String> bucketKeys, + List<String> partitionKeys, + int bucketNum, + @Nullable DataLakeFormat lakeFormat, + FlussSerializationSchema<InputT> serializationSchema) { + checkArgument( + partitionKeys != null && !partitionKeys.isEmpty(), + "Partition keys cannot be empty."); + this.flussRowType = flussRowType; + this.bucketKeys = bucketKeys; + this.partitionKeys = partitionKeys; + this.bucketNum = bucketNum; + this.lakeFormat = lakeFormat; + this.serializationSchema = serializationSchema; + } + + @Override + public void setup(int numChannels) { + LOG.info("Setting up with {} downstream channels", numChannels); + this.downstreamNumChannels = numChannels; + this.bucketingFunction = BucketingFunction.of(lakeFormat); + this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys, lakeFormat); + this.partitionGetter = new PartitionGetter(flussRowType, partitionKeys); + try { + this.serializationSchema.open(new SerializerInitContextImpl(flussRowType)); + } catch (Exception e) { + throw new FlussRuntimeException(e); + } + this.random = ThreadLocalRandom.current(); + } + + @Override + public int channel(StatisticsOrRecord<InputT> wrapper) { + if (wrapper == null) { + throw new FlussRuntimeException("StatisticsOrRecord wrapper must not be null"); + } + + try { + if (wrapper.hasStatistics()) { + this.delegatePartitioner = delegatePartitioner(wrapper.statistics()); + return (int) + (roundRobinCounter(downstreamNumChannels).getAndIncrement() + % downstreamNumChannels); + } else { + if (delegatePartitioner == null) { + delegatePartitioner = delegatePartitioner(null); + } + + RowWithOp rowWithOp = serializationSchema.serialize(wrapper.record()); + InternalRow row = rowWithOp.getRow(); + String partitionName = partitionGetter.getPartition(row); + return delegatePartitioner.select(partitionName, row, downstreamNumChannels); + } + } catch (Exception e) { + throw new FlussRuntimeException( + String.format( + "Failed to serialize static or record of type '%s' in FlinkRowDataChannelComputer: %s", + wrapper != null ? wrapper.getClass().getName() : "null", + e.getMessage()), + e); + } + } + + private boolean hasBucketKey() { + return bucketKeys != null && !bucketKeys.isEmpty(); + } + + private AtomicLong roundRobinCounter(int numPartitions) { + if (roundRobinCounter == null) { + // randomize the starting point to avoid synchronization across subtasks + this.roundRobinCounter = new AtomicLong(new Random().nextInt(numPartitions)); + } + + return roundRobinCounter; + } + + private MapPartitioner delegatePartitioner(@Nullable DataStatistics statistics) { + return statistics == null + ? new MapPartitioner() + : new MapPartitioner(assignment(downstreamNumChannels, statistics.result())); + } + + Map<String, PartitionAssignment> assignment( + int downstreamParallelism, Map<String, Long> statistics) { + statistics.forEach( + (key, value) -> + checkArgument( + value > 0, "Invalid statistics: weight is 0 for key %s", key)); + + long totalWeight = statistics.values().stream().mapToLong(l -> l).sum(); + long targetWeightPerSubtask = + (long) Math.ceil(((double) totalWeight) / downstreamParallelism); + + return buildAssignment( + downstreamParallelism, + statistics, + targetWeightPerSubtask, + hasBucketKey(), + bucketNum); + } + + Map<String, PartitionAssignment> buildAssignment( + int downstreamParallelism, + Map<String, Long> dataStatistics, + long targetWeightPerSubtask, + boolean hasBucketKey, + int bucketNum) { + Map<String, PartitionAssignment> assignmentMap = new HashMap<>(dataStatistics.size()); + Iterator<String> mapKeyIterator = dataStatistics.keySet().iterator(); + int subtaskId = 0; + String currentKey = null; + long keyRemainingWeight = 0L; + long subtaskRemainingWeight = targetWeightPerSubtask; + // todo: 计算assigned 的 subtasks 列表,并计算每个subtask的 weight + List<Integer> assignedSubtasks = new ArrayList<>(); + List<Long> subtaskWeights = new ArrayList<>(); + while (mapKeyIterator.hasNext() || currentKey != null) { + // This should never happen because target weight is calculated using ceil function. + // todo: numPartitions是下游的所有id + if (subtaskId >= downstreamParallelism) { + LOG.error( + "Internal algorithm error: exhausted subtasks with unassigned keys left. number of partitions: {}, " + + "target weight per subtask: {}, data statistics: {}", + downstreamParallelism, + targetWeightPerSubtask, + dataStatistics); + throw new IllegalStateException( + "Internal algorithm error: exhausted subtasks with unassigned keys left"); + } + + if (currentKey == null) { + currentKey = mapKeyIterator.next(); + keyRemainingWeight = dataStatistics.get(currentKey); + } + + assignedSubtasks.add(subtaskId); + if (keyRemainingWeight < subtaskRemainingWeight) { + // assign the remaining weight of the key to the current subtask + subtaskWeights.add(keyRemainingWeight); + subtaskRemainingWeight -= keyRemainingWeight; + keyRemainingWeight = 0L; + } else { + // filled up the current subtask + long assignedWeight = subtaskRemainingWeight; + keyRemainingWeight -= subtaskRemainingWeight; + + subtaskWeights.add(assignedWeight); + // move on to the next subtask + subtaskId += 1; + subtaskRemainingWeight = targetWeightPerSubtask; + } + + checkState( + assignedSubtasks.size() == subtaskWeights.size(), + "List size mismatch: assigned subtasks = %s, subtask weights = %s", + assignedSubtasks, + subtaskWeights); + + if (keyRemainingWeight == 0) { + // finishing up the assignment for the current key + PartitionAssignment keyAssignment = + hasBucketKey + ? new WeightedBucketIdAssignment( + assignedSubtasks, + subtaskWeights, + bucketNum, + bucketKeyEncoder, + bucketingFunction, + random) + : new WeightedRandomAssignment( + assignedSubtasks, subtaskWeights, random); + assignmentMap.put(currentKey, keyAssignment); + assignedSubtasks = new ArrayList<>(); + subtaskWeights = new ArrayList<>(); + currentKey = null; + } + } + + LOG.info("Assignment map: {}", assignmentMap); Review Comment: Use `debug` log, otherwise, too many logs. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsOperator.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.client.table.getter.PartitionGetter; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.flink.row.RowWithOp; +import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema; +import org.apache.fluss.flink.sink.serializer.SerializerInitContextImpl; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.RowType; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.List; + +import static org.apache.fluss.flink.adapter.RuntimeContextAdapter.getIndexOfThisSubtask; +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** + * Data statistics operator which collects local data statistics and sends them to coordinator for + * global data statics, then sends global data statics to partitioner. + * + * @param <InputT> + */ +@Internal +public class DataStatisticsOperator<InputT> + extends AbstractStreamOperator<StatisticsOrRecord<InputT>> + implements OneInputStreamOperator<InputT, StatisticsOrRecord<InputT>>, + OperatorEventHandler { + + private static final long serialVersionUID = 1L; + + private final String operatorName; + private final RowType rowType; + private final List<String> partitionKeys; + private final FlussSerializationSchema<InputT> flussSerializationSchema; + private final OperatorEventGateway operatorEventGateway; + + private transient int subtaskIndex; + private transient volatile DataStatistics localStatistics; + private transient PartitionGetter partitionGetter; + private transient TypeSerializer<DataStatistics> statisticsSerializer; + + DataStatisticsOperator( + StreamOperatorParameters<StatisticsOrRecord<InputT>> parameters, + String operatorName, + RowType rowType, + List<String> partitionKeys, + OperatorEventGateway operatorEventGateway, + FlussSerializationSchema<InputT> flussSerializationSchema) { + super(); + checkArgument( + partitionKeys != null && !partitionKeys.isEmpty(), + "Partition keys cannot be empty."); + this.operatorName = operatorName; + this.operatorEventGateway = operatorEventGateway; + this.flussSerializationSchema = flussSerializationSchema; + this.rowType = rowType; + this.partitionKeys = partitionKeys; + this.setup( + parameters.getContainingTask(), + parameters.getStreamConfig(), + parameters.getOutput()); + } + + @Override + public void open() throws Exception { + this.partitionGetter = new PartitionGetter(rowType, partitionKeys); + this.statisticsSerializer = new DataStatisticsSerializer(); + try { + this.flussSerializationSchema.open(new SerializerInitContextImpl(rowType)); + } catch (Exception e) { + throw new FlussRuntimeException(e); + } + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + this.subtaskIndex = getIndexOfThisSubtask(getRuntimeContext()); + this.localStatistics = StatisticsUtil.createDataStatistics(); + } + + @Override + public void handleOperatorEvent(OperatorEvent event) { + checkArgument( + event instanceof StatisticsEvent, + String.format( + "Operator %s subtask %s received unexpected operator event %s", + operatorName, subtaskIndex, event.getClass())); + StatisticsEvent statisticsEvent = (StatisticsEvent) event; + LOG.info( + "Operator {} subtask {} received global data event from coordinator checkpoint {}", + operatorName, + subtaskIndex, + statisticsEvent.checkpointId()); + DataStatistics globalStatistics = + StatisticsUtil.deserializeDataStatistics( + statisticsEvent.statisticsBytes(), statisticsSerializer); + LOG.info("Global data statistics: {}", globalStatistics); Review Comment: `globalStatistics` is very huge, should use `debug` log level here. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java: ########## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.shaded.guava32.com.google.common.collect.Iterables; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.ThrowableCatchingRunnable; +import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * Coordinator for collecting global data statistics. + * + * <p>NOTE: This class is inspired from Iceberg project. + */ +@Internal +class DataStatisticsCoordinator implements OperatorCoordinator { + private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class); + + private final String operatorName; + private final OperatorCoordinator.Context context; + private final ExecutorService coordinatorExecutor; + private final SubtaskGateways subtaskGateways; + private final CoordinatorExecutorThreadFactory coordinatorThreadFactory; + private final TypeSerializer<DataStatistics> statisticsSerializer; + + private transient boolean started; + private transient AggregatedStatisticsTracker aggregatedStatisticsTracker; + + DataStatisticsCoordinator(String operatorName, OperatorCoordinator.Context context) { + this.operatorName = operatorName; + this.context = context; + this.coordinatorThreadFactory = + new CoordinatorExecutorThreadFactory( + "DataStatisticsCoordinator-" + operatorName, + context.getUserCodeClassloader()); + this.coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory); + this.subtaskGateways = new SubtaskGateways(operatorName, context.currentParallelism()); + this.statisticsSerializer = new DataStatisticsSerializer(); + } + + @Override + public void start() throws Exception { + LOG.debug("Starting data statistics coordinator: {}.", operatorName); + this.started = true; + + // statistics are restored already in resetToCheckpoint() before start() called + this.aggregatedStatisticsTracker = + new AggregatedStatisticsTracker(operatorName, context.currentParallelism()); + } + + @Override + public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) { + runInCoordinatorThread( + () -> { + LOG.debug( + "Handling event from subtask {} (#{}) of {}: {}", + subtask, + attemptNumber, + operatorName, + event); + if (event instanceof StatisticsEvent) { + handleDataStatisticRequest(subtask, ((StatisticsEvent) event)); + } else { + throw new IllegalArgumentException( + "Invalid operator event type: " + + event.getClass().getCanonicalName()); + } + }, + String.format( + Locale.ROOT, + "handling operator event %s from subtask %d (#%d)", + event.getClass(), + subtask, + attemptNumber)); + } + + @Override + public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) + throws Exception { + resultFuture.complete(new byte[0]); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) {} + + @Override + public void resetToCheckpoint(long checkpointId, byte[] checkpointData) { + checkState( + !started, + "The coordinator %s can only be reset if it was not yet started", + operatorName); + } + + @Override + public void subtaskReset(int subtask, long checkpointId) { + runInCoordinatorThread( + () -> { + LOG.info( + "Operator {} subtask {} is reset to checkpoint {}", + operatorName, + subtask, + checkpointId); + checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); + subtaskGateways.reset(subtask); + }, + String.format( + Locale.ROOT, + "handling subtask %d recovery to checkpoint %d", + subtask, + checkpointId)); + } + + @Override + public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) { + runInCoordinatorThread( + () -> { + LOG.info( + "Unregistering gateway after failure for subtask {} (#{}) of data statistics {}", + subtask, + attemptNumber, + operatorName); + checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); + subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber); + }, + String.format( + Locale.ROOT, "handling subtask %d (#%d) failure", subtask, attemptNumber)); + } + + @Override + public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) { + checkArgument(subtask == gateway.getSubtask()); + checkArgument(attemptNumber == gateway.getExecution().getAttemptNumber()); + runInCoordinatorThread( + () -> { + checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); + subtaskGateways.registerSubtaskGateway(gateway); + }, + String.format( + Locale.ROOT, + "making event gateway to subtask %d (#%d) available", + subtask, + attemptNumber)); + } + + @Override + public void close() throws Exception { + coordinatorExecutor.shutdown(); + this.aggregatedStatisticsTracker = null; + this.started = false; + LOG.info("Closed data statistics coordinator: {}.", operatorName); + } + + private void runInCoordinatorThread(Runnable runnable) { + this.coordinatorExecutor.execute( + new ThrowableCatchingRunnable( + throwable -> + this.coordinatorThreadFactory.uncaughtException( + Thread.currentThread(), throwable), + runnable)); + } + + private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, String actionString) { + ensureStarted(); + runInCoordinatorThread( + () -> { + try { + action.run(); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + LOG.error( + "Uncaught exception in the data statistics coordinator: {} while {}. Triggering job failover", + operatorName, + actionString, + t); + context.failJob(t); + } + }); + } + + private void ensureStarted() { + checkState(started, "The coordinator of %s has not started yet.", operatorName); + } + + private void handleDataStatisticRequest(int subtask, StatisticsEvent event) { + DataStatistics maybeCompletedStatistics = + aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event); + if (maybeCompletedStatistics != null) { + if (maybeCompletedStatistics.isEmpty()) { + LOG.debug( + "Skip aggregated statistics for checkpoint {} as it is empty.", + event.checkpointId()); + } else { + LOG.debug( + "Completed statistics aggregation for checkpoint {}", event.checkpointId()); + sendGlobalStatisticsToSubtasks(maybeCompletedStatistics, event.checkpointId()); + } + } + } + + private void sendGlobalStatisticsToSubtasks(DataStatistics statistics, long checkpointId) { + runInCoordinatorThread( + () -> { + LOG.info( + "Broadcast latest global statistics from checkpoint {} to all subtasks", + checkpointId); + // applyImmediately is set to false so that operator subtasks can + // apply the change at checkpoint boundary + StatisticsEvent statisticsEvent = + StatisticsEvent.createStatisticsEvent( + checkpointId, statistics, statisticsSerializer); + for (int i = 0; i < context.currentParallelism(); ++i) { + // Ignore future return value for potential error (e.g. subtask down). + // Upon restart, subtasks send request to coordinator to refresh statistics + // if there is any difference + subtaskGateways.getSubtaskGateway(i).sendEvent(statisticsEvent); + } + }, + String.format( + Locale.ROOT, + "Failed to send operator %s coordinator global data statistics for checkpoint %d", + operatorName, + checkpointId)); + } + + @VisibleForTesting + void callInCoordinatorThread(Callable<Void> callable, String errorMessage) { + ensureStarted(); + // Ensure the task is done by the coordinator executor. + if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) { + try { + Callable<Void> guardedCallable = + () -> { + try { + return callable.call(); + } catch (Throwable t) { + LOG.error( + "Uncaught Exception in data statistics coordinator: {} executor", + operatorName, + t); + ExceptionUtils.rethrowException(t); + return null; + } + }; + + coordinatorExecutor.submit(guardedCallable).get(); + } catch (InterruptedException | ExecutionException e) { + throw new FlinkRuntimeException(errorMessage, e); + } + } else { + try { + callable.call(); + } catch (Throwable t) { + LOG.error( + "Uncaught Exception in data statistics coordinator: {} executor", + operatorName, + t); + throw new FlinkRuntimeException(errorMessage, t); + } + } + } + + @VisibleForTesting + public SubtaskGateways getSubtaskGateways() { + return subtaskGateways; + } + + static class SubtaskGateways { + private final String operatorName; + private final Map<Integer, SubtaskGateway>[] gateways; + + @SuppressWarnings("unchecked") + private SubtaskGateways(String operatorName, int parallelism) { + this.operatorName = operatorName; + gateways = new Map[parallelism]; + + for (int i = 0; i < parallelism; ++i) { + gateways[i] = new HashMap<>(); + } + } + + private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway gateway) { + int subtaskIndex = gateway.getSubtask(); + int attemptNumber = gateway.getExecution().getAttemptNumber(); + checkState( + !gateways[subtaskIndex].containsKey(attemptNumber), + "Coordinator of %s already has a subtask gateway for %d (#%d)", + operatorName, + subtaskIndex, + attemptNumber); + LOG.debug( + "Coordinator of {} registers gateway for subtask {} attempt {}", + operatorName, + subtaskIndex, + attemptNumber); + gateways[subtaskIndex].put(attemptNumber, gateway); + } + + private void unregisterSubtaskGateway(int subtaskIndex, int attemptNumber) { + LOG.debug( + "Coordinator of {} unregisters gateway for subtask {} attempt {}", + operatorName, + subtaskIndex, + attemptNumber); + gateways[subtaskIndex].remove(attemptNumber); + } + + protected OperatorCoordinator.SubtaskGateway getSubtaskGateway(int subtaskIndex) { + checkState( + !gateways[subtaskIndex].isEmpty(), + "Coordinator of %s subtask %d is not ready yet to receive events", + operatorName, + subtaskIndex); + return Iterables.getOnlyElement(gateways[subtaskIndex].values()); + } + + private void reset(int subtaskIndex) { + gateways[subtaskIndex].clear(); + } + } + + private static class CoordinatorExecutorThreadFactory + implements ThreadFactory, Thread.UncaughtExceptionHandler { + + private final String coordinatorThreadName; + private final ClassLoader classLoader; + private final Thread.UncaughtExceptionHandler errorHandler; + + @javax.annotation.Nullable private Thread thread; Review Comment: remove unnecessary qualification ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java: ########## @@ -123,20 +134,76 @@ public AppendSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) { @Override public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) { - // For append only sink, we will do bucket shuffle only if bucket keys are not empty. - if (!bucketKeys.isEmpty() && shuffleByBucketId) { - return partition( - input, - new FlinkRowDataChannelComputer<>( - toFlussRowType(tableRowType), - bucketKeys, - partitionKeys, - lakeFormat, - numBucket, - flussSerializationSchema), - input.getParallelism()); - } else { - return input; + switch (shuffleMode) { + case BUCKET_SHUFFLE: + if (!bucketKeys.isEmpty()) { + return partition( + input, + new FlinkRowDataChannelComputer<>( + toFlussRowType(tableRowType), + bucketKeys, + partitionKeys, + lakeFormat, + numBucket, + flussSerializationSchema), + input.getParallelism()); + } + return input; + case NONE: + return input; + case DYNAMIC_SHUFFLE: + if (partitionKeys.isEmpty()) { + throw new UnsupportedOperationException( + "DYNAMIC_SHUFFLE is only supported for partition tables"); + } + + if (rowTypeInformation == null) { + throw new UnsupportedOperationException( + "RowTypeInformation is required for DYNAMIC_SHUFFLE mode."); + } + TypeInformation<StatisticsOrRecord<InputT>> statisticsOrRecordTypeInformation = + new StatisticsOrRecordTypeInformation<>(rowTypeInformation); + SingleOutputStreamOperator<StatisticsOrRecord<InputT>> shuffleStream = + input.transform( + "Dynamic shuffle data statistics", + statisticsOrRecordTypeInformation, + new DataStatisticsOperatorFactory<>( + toFlussRowType(tableRowType), + partitionKeys, + flussSerializationSchema)) + .uid("Dynamic shuffle data statistics" + tablePath) Review Comment: ```suggestion .uid("Dynamic shuffle data statistics " + tablePath) ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecord.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.annotation.Internal; + +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.Objects; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** + * Either a record or a statistics. + * + * @param <InputT> + */ +@Internal +public class StatisticsOrRecord<InputT> { + + private DataStatistics statistics; + private InputT record; + + private StatisticsOrRecord(DataStatistics statistics, InputT record) { + checkArgument( + record != null ^ statistics != null, + "DataStatistics or record, not neither or both"); + this.statistics = statistics; + this.record = record; + } + + public static <InputT> StatisticsOrRecord<InputT> fromRecord(InputT record) { + return new StatisticsOrRecord<>(null, record); + } + + public static <InputT> StatisticsOrRecord<InputT> fromStatistics(DataStatistics statistics) { + return new StatisticsOrRecord<>(statistics, null); + } + + public static <InputT> StatisticsOrRecord<InputT> reuseRecord( + StatisticsOrRecord<InputT> reuse, TypeSerializer<InputT> recordSerializer) { + if (reuse.hasRecord()) { + return reuse; + } else { + // not reusable + return StatisticsOrRecord.fromRecord(recordSerializer.createInstance()); + } + } + + public static <InputT> StatisticsOrRecord<InputT> reuseStatistics( + StatisticsOrRecord<InputT> reuse, TypeSerializer<DataStatistics> statisticsSerializer) { + if (reuse.hasStatistics()) { + return reuse; + } else { + // not reusable + return StatisticsOrRecord.fromStatistics(statisticsSerializer.createInstance()); + } + } + + boolean hasStatistics() { + return statistics != null; + } + + public boolean hasRecord() { Review Comment: nit: rename to `isRecord`. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java: ########## @@ -188,6 +194,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { } FlinkSink<RowData> flinkSink = getFlinkSink(targetColumnIndexes); + if (distributionMode == DistributionMode.DYNAMIC_SHUFFLE) { + return new DataStreamSinkProvider() { + @Override + public DataStreamSink<?> consumeDataStream( + ProviderContext providerContext, DataStream<RowData> dataStream) { + String sinkName = + "sink" + + "-" + + tablePath.getTableName() + + "-" + + tablePath.getDatabaseName(); + return dataStream.sinkTo(flinkSink).uid(sinkName).name(sinkName); Review Comment: Do we really need to assign uid for the sink operator? We don't have state in sink for now. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java: ########## @@ -123,20 +134,76 @@ public AppendSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) { @Override public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) { - // For append only sink, we will do bucket shuffle only if bucket keys are not empty. - if (!bucketKeys.isEmpty() && shuffleByBucketId) { - return partition( - input, - new FlinkRowDataChannelComputer<>( - toFlussRowType(tableRowType), - bucketKeys, - partitionKeys, - lakeFormat, - numBucket, - flussSerializationSchema), - input.getParallelism()); - } else { - return input; + switch (shuffleMode) { + case BUCKET_SHUFFLE: + if (!bucketKeys.isEmpty()) { + return partition( + input, + new FlinkRowDataChannelComputer<>( + toFlussRowType(tableRowType), + bucketKeys, + partitionKeys, + lakeFormat, + numBucket, + flussSerializationSchema), + input.getParallelism()); + } + return input; + case NONE: + return input; + case DYNAMIC_SHUFFLE: + if (partitionKeys.isEmpty()) { + throw new UnsupportedOperationException( + "DYNAMIC_SHUFFLE is only supported for partition tables"); + } + + if (rowTypeInformation == null) { + throw new UnsupportedOperationException( + "RowTypeInformation is required for DYNAMIC_SHUFFLE mode."); + } + TypeInformation<StatisticsOrRecord<InputT>> statisticsOrRecordTypeInformation = + new StatisticsOrRecordTypeInformation<>(rowTypeInformation); Review Comment: Use `new StatisticsOrRecordTypeInformation<>(input.getType());` so we don't need to introduce `rowTypeInformation`? And this will be safer. ########## fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java: ########## @@ -100,7 +102,10 @@ void before() { // open a catalog so that we can get table from the catalog String bootstrapServers = FLUSS_CLUSTER_EXTENSION.getBootstrapServers(); // create table environment - env = StreamExecutionEnvironment.getExecutionEnvironment(); + org.apache.flink.configuration.Configuration config = + new org.apache.flink.configuration.Configuration(); + config.setString("taskmanager.numberOfTaskSlots", "3"); Review Comment: Is this change necessary? Because the default slots is 4 defined in `AbstractTestBase`. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatistics.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.flink.annotation.Internal; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** Data statistics for a partition name and its frequency. */ +@Internal +public class DataStatistics { + + private final Map<String, Long> partitionFrequency; + + public DataStatistics() { + this.partitionFrequency = new HashMap<>(); + } + + DataStatistics(Map<String, Long> partitionFrequency) { + this.partitionFrequency = partitionFrequency; + } + + public boolean isEmpty() { + return partitionFrequency.isEmpty(); + } + + public void add(String partition, long value) { + if (partitionFrequency.containsKey(partition)) { + partitionFrequency.merge(partition, value, Long::sum); + } else { + // clone the sort key before adding to map because input sortKey object can be reused + partitionFrequency.put(partition, value); Review Comment: Can just `partitionFrequency.merge(partition, value, Long::sum);` for simplification and better performance. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java: ########## @@ -159,6 +168,66 @@ public RowWithOp serialize(RowData value) throws Exception { return new RowWithOp(row, opType); } + @Override + public long size(RowData value, RowType rowType) { + if (value instanceof BinaryFormat) { + return ((BinaryFormat) value).getSizeInBytes(); + } + + long size = 0; + for (int i = 0; i < rowType.getFieldCount(); i++) { + DataField field = rowType.getFields().get(i); + DataTypeRoot typeRoot = field.getType().getTypeRoot(); + if (value.isNullAt(i)) { + continue; + } + switch (typeRoot) { + case CHAR: + size += ((CharType) (field.getType())).getLength(); + break; + case STRING: + StringData stringData = value.getString(i); + if (stringData instanceof BinaryStringData) { + size += ((BinaryStringData) stringData).getSizeInBytes(); + } else { + size += converter.getString(i).getSizeInBytes(); + } + break; + case BINARY: + size += ((BinaryType) (field.getType())).getLength(); + break; + case BYTES: + size += converter.getBytes(i).length; Review Comment: ditto ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java: ########## @@ -159,6 +168,66 @@ public RowWithOp serialize(RowData value) throws Exception { return new RowWithOp(row, opType); } + @Override + public long size(RowData value, RowType rowType) { + if (value instanceof BinaryFormat) { + return ((BinaryFormat) value).getSizeInBytes(); + } + + long size = 0; + for (int i = 0; i < rowType.getFieldCount(); i++) { + DataField field = rowType.getFields().get(i); + DataTypeRoot typeRoot = field.getType().getTypeRoot(); + if (value.isNullAt(i)) { + continue; + } + switch (typeRoot) { + case CHAR: + size += ((CharType) (field.getType())).getLength(); + break; + case STRING: + StringData stringData = value.getString(i); + if (stringData instanceof BinaryStringData) { + size += ((BinaryStringData) stringData).getSizeInBytes(); + } else { + size += converter.getString(i).getSizeInBytes(); Review Comment: Better to use `size += stringData.toBytes().length`. It's confusing what's the `converter` here, and what's the underlying row of `converter`. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/WeightedRandomAssignment.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.row.InternalRow; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * Partition assignment strategy that randomly distributes records to subtasks based on configured + * weights. + * + * <p>This assignment strategy enables weighted random distribution of records to subtasks, allowing + * for more balanced load distribution across downstream subtasks. The assignment uses a weighted + * random algorithm where subtasks with higher weights have a proportionally higher probability of + * being selected. + * + * <p>NOTE: This class is inspired from Iceberg project. + */ +@Internal +public class WeightedRandomAssignment implements PartitionAssignment { + protected final List<Integer> assignedSubtasks; + protected final List<Long> subtaskWeights; + protected final long keyWeight; + protected final double[] cumulativeWeights; + private final Random random; + + /** + * @param assignedSubtasks assigned subtasks for this key. It could be a single subtask. It + * could also be multiple subtasks if the key has heavy weight that should be handled by + * multiple subtasks. + * @param subtaskWeights assigned weight for each subtask. E.g., if the keyWeight is 27 and the + * key is assigned to 3 subtasks, subtaskWeights could contain values as [10, 10, 7] for + * target weight of 10 per subtask. + */ + WeightedRandomAssignment( + List<Integer> assignedSubtasks, List<Long> subtaskWeights, Random random) { + checkArgument( + assignedSubtasks != null && !assignedSubtasks.isEmpty(), + "Invalid assigned subtasks: null or empty"); + checkArgument( + subtaskWeights != null && !subtaskWeights.isEmpty(), + "Invalid assigned subtasks weights: null or empty"); + checkArgument( + assignedSubtasks.size() == subtaskWeights.size(), + "Invalid assignment: size mismatch (tasks length = %s, weights length = %s)", + assignedSubtasks.size(), + subtaskWeights.size()); + + this.assignedSubtasks = assignedSubtasks; + this.subtaskWeights = subtaskWeights; + this.keyWeight = subtaskWeights.stream().mapToLong(Long::longValue).sum(); + this.cumulativeWeights = new double[subtaskWeights.size()]; + long cumulativeWeight = 0; + for (int i = 0; i < subtaskWeights.size(); ++i) { + cumulativeWeight += subtaskWeights.get(i); + cumulativeWeights[i] = cumulativeWeight; + } + this.random = random; + } + + /** + * Select a subtask for the key. If bucket key is existed , same key will be assigned to the + * same subtask. + * + * @return subtask id + */ + @Override + public int select(InternalRow row) { + if (assignedSubtasks.size() == 1) { + // only choice. no need to run random number generator. + return assignedSubtasks.get(0); + } else { + double randomNumber = nextDouble(0, keyWeight); + int index = Arrays.binarySearch(cumulativeWeights, randomNumber); + // choose the subtask where randomNumber < cumulativeWeights[pos]. + // this works regardless whether index is negative or not. + int position = Math.abs(index + 1); + checkState( + position < assignedSubtasks.size(), + "Invalid selected position: out of range. key weight = %s, random number = %s, cumulative weights array = %s", + keyWeight, + randomNumber, + Arrays.toString(cumulativeWeights)); + return assignedSubtasks.get(position); + } + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + WeightedRandomAssignment that = (WeightedRandomAssignment) o; + return keyWeight == that.keyWeight + && Objects.equals(assignedSubtasks, that.assignedSubtasks) + && Objects.equals(subtaskWeights, that.subtaskWeights) + && Objects.deepEquals(cumulativeWeights, that.cumulativeWeights); + } + + @Override + public int hashCode() { + return Objects.hash( + assignedSubtasks, subtaskWeights, keyWeight, Arrays.hashCode(cumulativeWeights)); + } + + @Override + public String toString() { + return "KeyAssignment{" + + "assignedSubtasks=" + + assignedSubtasks + + ", subtaskWeights=" + + subtaskWeights + + ", keyWeight=" + + keyWeight + + ", cumulativeWeights=" + + Arrays.toString(cumulativeWeights) + + '}'; + } + + protected double nextDouble(double origin, double bound) { Review Comment: Add comment to explain why we don't directly use `ThreadLocalThread.nextDouble(...)` here. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java: ########## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.shaded.guava32.com.google.common.collect.Iterables; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.ThrowableCatchingRunnable; +import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * Coordinator for collecting global data statistics. + * + * <p>NOTE: This class is inspired from Iceberg project. + */ +@Internal +class DataStatisticsCoordinator implements OperatorCoordinator { + private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class); + + private final String operatorName; + private final OperatorCoordinator.Context context; + private final ExecutorService coordinatorExecutor; + private final SubtaskGateways subtaskGateways; + private final CoordinatorExecutorThreadFactory coordinatorThreadFactory; + private final TypeSerializer<DataStatistics> statisticsSerializer; + + private transient boolean started; + private transient AggregatedStatisticsTracker aggregatedStatisticsTracker; + + DataStatisticsCoordinator(String operatorName, OperatorCoordinator.Context context) { + this.operatorName = operatorName; + this.context = context; + this.coordinatorThreadFactory = + new CoordinatorExecutorThreadFactory( + "DataStatisticsCoordinator-" + operatorName, + context.getUserCodeClassloader()); + this.coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory); + this.subtaskGateways = new SubtaskGateways(operatorName, context.currentParallelism()); + this.statisticsSerializer = new DataStatisticsSerializer(); + } + + @Override + public void start() throws Exception { + LOG.debug("Starting data statistics coordinator: {}.", operatorName); + this.started = true; + + // statistics are restored already in resetToCheckpoint() before start() called + this.aggregatedStatisticsTracker = + new AggregatedStatisticsTracker(operatorName, context.currentParallelism()); + } + + @Override + public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) { + runInCoordinatorThread( + () -> { + LOG.debug( + "Handling event from subtask {} (#{}) of {}: {}", + subtask, + attemptNumber, + operatorName, + event); + if (event instanceof StatisticsEvent) { + handleDataStatisticRequest(subtask, ((StatisticsEvent) event)); + } else { + throw new IllegalArgumentException( + "Invalid operator event type: " + + event.getClass().getCanonicalName()); + } + }, + String.format( + Locale.ROOT, + "handling operator event %s from subtask %d (#%d)", + event.getClass(), + subtask, + attemptNumber)); + } + + @Override + public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) + throws Exception { + resultFuture.complete(new byte[0]); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) {} + + @Override + public void resetToCheckpoint(long checkpointId, byte[] checkpointData) { + checkState( + !started, + "The coordinator %s can only be reset if it was not yet started", + operatorName); + } + + @Override + public void subtaskReset(int subtask, long checkpointId) { + runInCoordinatorThread( + () -> { + LOG.info( + "Operator {} subtask {} is reset to checkpoint {}", + operatorName, + subtask, + checkpointId); + checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); + subtaskGateways.reset(subtask); + }, + String.format( + Locale.ROOT, + "handling subtask %d recovery to checkpoint %d", + subtask, + checkpointId)); + } + + @Override + public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) { + runInCoordinatorThread( + () -> { + LOG.info( + "Unregistering gateway after failure for subtask {} (#{}) of data statistics {}", + subtask, + attemptNumber, + operatorName); + checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); + subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber); + }, + String.format( + Locale.ROOT, "handling subtask %d (#%d) failure", subtask, attemptNumber)); + } + + @Override + public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) { + checkArgument(subtask == gateway.getSubtask()); + checkArgument(attemptNumber == gateway.getExecution().getAttemptNumber()); + runInCoordinatorThread( + () -> { + checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); + subtaskGateways.registerSubtaskGateway(gateway); + }, + String.format( + Locale.ROOT, + "making event gateway to subtask %d (#%d) available", + subtask, + attemptNumber)); + } + + @Override + public void close() throws Exception { + coordinatorExecutor.shutdown(); + this.aggregatedStatisticsTracker = null; + this.started = false; + LOG.info("Closed data statistics coordinator: {}.", operatorName); + } + + private void runInCoordinatorThread(Runnable runnable) { + this.coordinatorExecutor.execute( + new ThrowableCatchingRunnable( + throwable -> + this.coordinatorThreadFactory.uncaughtException( + Thread.currentThread(), throwable), + runnable)); + } + + private void runInCoordinatorThread(ThrowingRunnable<Throwable> action, String actionString) { + ensureStarted(); + runInCoordinatorThread( + () -> { + try { + action.run(); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + LOG.error( + "Uncaught exception in the data statistics coordinator: {} while {}. Triggering job failover", + operatorName, + actionString, + t); + context.failJob(t); + } + }); + } + + private void ensureStarted() { + checkState(started, "The coordinator of %s has not started yet.", operatorName); + } + + private void handleDataStatisticRequest(int subtask, StatisticsEvent event) { + DataStatistics maybeCompletedStatistics = + aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, event); + if (maybeCompletedStatistics != null) { + if (maybeCompletedStatistics.isEmpty()) { + LOG.debug( + "Skip aggregated statistics for checkpoint {} as it is empty.", + event.checkpointId()); + } else { + LOG.debug( + "Completed statistics aggregation for checkpoint {}", event.checkpointId()); + sendGlobalStatisticsToSubtasks(maybeCompletedStatistics, event.checkpointId()); + } + } + } + + private void sendGlobalStatisticsToSubtasks(DataStatistics statistics, long checkpointId) { + runInCoordinatorThread( + () -> { + LOG.info( + "Broadcast latest global statistics from checkpoint {} to all subtasks", + checkpointId); + // applyImmediately is set to false so that operator subtasks can + // apply the change at checkpoint boundary + StatisticsEvent statisticsEvent = + StatisticsEvent.createStatisticsEvent( + checkpointId, statistics, statisticsSerializer); + for (int i = 0; i < context.currentParallelism(); ++i) { + // Ignore future return value for potential error (e.g. subtask down). + // Upon restart, subtasks send request to coordinator to refresh statistics + // if there is any difference + subtaskGateways.getSubtaskGateway(i).sendEvent(statisticsEvent); + } + }, + String.format( + Locale.ROOT, + "Failed to send operator %s coordinator global data statistics for checkpoint %d", + operatorName, + checkpointId)); + } + + @VisibleForTesting + void callInCoordinatorThread(Callable<Void> callable, String errorMessage) { + ensureStarted(); + // Ensure the task is done by the coordinator executor. + if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) { + try { + Callable<Void> guardedCallable = + () -> { + try { + return callable.call(); + } catch (Throwable t) { + LOG.error( + "Uncaught Exception in data statistics coordinator: {} executor", + operatorName, + t); + ExceptionUtils.rethrowException(t); + return null; + } + }; + + coordinatorExecutor.submit(guardedCallable).get(); + } catch (InterruptedException | ExecutionException e) { + throw new FlinkRuntimeException(errorMessage, e); + } + } else { + try { + callable.call(); + } catch (Throwable t) { + LOG.error( + "Uncaught Exception in data statistics coordinator: {} executor", + operatorName, + t); + throw new FlinkRuntimeException(errorMessage, t); + } + } + } + + @VisibleForTesting + public SubtaskGateways getSubtaskGateways() { + return subtaskGateways; + } + + static class SubtaskGateways { + private final String operatorName; + private final Map<Integer, SubtaskGateway>[] gateways; + + @SuppressWarnings("unchecked") + private SubtaskGateways(String operatorName, int parallelism) { + this.operatorName = operatorName; + gateways = new Map[parallelism]; + + for (int i = 0; i < parallelism; ++i) { + gateways[i] = new HashMap<>(); + } + } + + private void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway gateway) { + int subtaskIndex = gateway.getSubtask(); + int attemptNumber = gateway.getExecution().getAttemptNumber(); + checkState( + !gateways[subtaskIndex].containsKey(attemptNumber), + "Coordinator of %s already has a subtask gateway for %d (#%d)", + operatorName, + subtaskIndex, + attemptNumber); + LOG.debug( + "Coordinator of {} registers gateway for subtask {} attempt {}", + operatorName, + subtaskIndex, + attemptNumber); + gateways[subtaskIndex].put(attemptNumber, gateway); + } + + private void unregisterSubtaskGateway(int subtaskIndex, int attemptNumber) { + LOG.debug( + "Coordinator of {} unregisters gateway for subtask {} attempt {}", + operatorName, + subtaskIndex, + attemptNumber); + gateways[subtaskIndex].remove(attemptNumber); + } + + protected OperatorCoordinator.SubtaskGateway getSubtaskGateway(int subtaskIndex) { + checkState( + !gateways[subtaskIndex].isEmpty(), + "Coordinator of %s subtask %d is not ready yet to receive events", + operatorName, + subtaskIndex); + return Iterables.getOnlyElement(gateways[subtaskIndex].values()); + } + + private void reset(int subtaskIndex) { + gateways[subtaskIndex].clear(); + } + } + + private static class CoordinatorExecutorThreadFactory + implements ThreadFactory, Thread.UncaughtExceptionHandler { + + private final String coordinatorThreadName; + private final ClassLoader classLoader; + private final Thread.UncaughtExceptionHandler errorHandler; + + @javax.annotation.Nullable private Thread thread; + + CoordinatorExecutorThreadFactory( + final String coordinatorThreadName, final ClassLoader contextClassLoader) { + this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE); + } + + @org.apache.flink.annotation.VisibleForTesting Review Comment: Use Fluss `@VisibleForTesting`. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordChannelComputer.java: ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.bucketing.BucketingFunction; +import org.apache.fluss.client.table.getter.PartitionGetter; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.flink.row.RowWithOp; +import org.apache.fluss.flink.sink.ChannelComputer; +import org.apache.fluss.flink.sink.serializer.FlussSerializationSchema; +import org.apache.fluss.flink.sink.serializer.SerializerInitContextImpl; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.encode.KeyEncoder; +import org.apache.fluss.types.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * {@link ChannelComputer} for {@link StatisticsOrRecord} which will change shuffle based on + * partition statistic. + */ +@Internal +public class StatisticsOrRecordChannelComputer<InputT> + implements ChannelComputer<StatisticsOrRecord<InputT>> { + private static final Logger LOG = + LoggerFactory.getLogger(StatisticsOrRecordChannelComputer.class); + + private final @Nullable DataLakeFormat lakeFormat; + private final RowType flussRowType; + private final List<String> bucketKeys; + private final List<String> partitionKeys; + private final FlussSerializationSchema<InputT> serializationSchema; + private final int bucketNum; + + private transient int downstreamNumChannels; + private transient KeyEncoder bucketKeyEncoder; + private transient PartitionGetter partitionGetter; + private transient MapPartitioner delegatePartitioner; + private transient AtomicLong roundRobinCounter; + private transient BucketingFunction bucketingFunction; + private transient Random random; + + public StatisticsOrRecordChannelComputer( + RowType flussRowType, + List<String> bucketKeys, + List<String> partitionKeys, + int bucketNum, + @Nullable DataLakeFormat lakeFormat, + FlussSerializationSchema<InputT> serializationSchema) { + checkArgument( + partitionKeys != null && !partitionKeys.isEmpty(), + "Partition keys cannot be empty."); + this.flussRowType = flussRowType; + this.bucketKeys = bucketKeys; + this.partitionKeys = partitionKeys; + this.bucketNum = bucketNum; + this.lakeFormat = lakeFormat; + this.serializationSchema = serializationSchema; + } + + @Override + public void setup(int numChannels) { + LOG.info("Setting up with {} downstream channels", numChannels); + this.downstreamNumChannels = numChannels; + this.bucketingFunction = BucketingFunction.of(lakeFormat); + this.bucketKeyEncoder = KeyEncoder.of(flussRowType, bucketKeys, lakeFormat); + this.partitionGetter = new PartitionGetter(flussRowType, partitionKeys); + try { + this.serializationSchema.open(new SerializerInitContextImpl(flussRowType)); + } catch (Exception e) { + throw new FlussRuntimeException(e); + } + this.random = ThreadLocalRandom.current(); + } + + @Override + public int channel(StatisticsOrRecord<InputT> wrapper) { + if (wrapper == null) { + throw new FlussRuntimeException("StatisticsOrRecord wrapper must not be null"); + } + + try { + if (wrapper.hasStatistics()) { + this.delegatePartitioner = delegatePartitioner(wrapper.statistics()); + return (int) + (roundRobinCounter(downstreamNumChannels).getAndIncrement() + % downstreamNumChannels); + } else { + if (delegatePartitioner == null) { + delegatePartitioner = delegatePartitioner(null); + } + + RowWithOp rowWithOp = serializationSchema.serialize(wrapper.record()); + InternalRow row = rowWithOp.getRow(); + String partitionName = partitionGetter.getPartition(row); + return delegatePartitioner.select(partitionName, row, downstreamNumChannels); + } + } catch (Exception e) { + throw new FlussRuntimeException( + String.format( + "Failed to serialize static or record of type '%s' in FlinkRowDataChannelComputer: %s", + wrapper != null ? wrapper.getClass().getName() : "null", Review Comment: wrapper is always not null here ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecord.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.annotation.Internal; + +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.Objects; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** + * Either a record or a statistics. + * + * @param <InputT> + */ +@Internal +public class StatisticsOrRecord<InputT> { + + private DataStatistics statistics; + private InputT record; + + private StatisticsOrRecord(DataStatistics statistics, InputT record) { + checkArgument( + record != null ^ statistics != null, + "DataStatistics or record, not neither or both"); + this.statistics = statistics; + this.record = record; + } + + public static <InputT> StatisticsOrRecord<InputT> fromRecord(InputT record) { + return new StatisticsOrRecord<>(null, record); + } + + public static <InputT> StatisticsOrRecord<InputT> fromStatistics(DataStatistics statistics) { + return new StatisticsOrRecord<>(statistics, null); + } + + public static <InputT> StatisticsOrRecord<InputT> reuseRecord( + StatisticsOrRecord<InputT> reuse, TypeSerializer<InputT> recordSerializer) { + if (reuse.hasRecord()) { + return reuse; + } else { + // not reusable + return StatisticsOrRecord.fromRecord(recordSerializer.createInstance()); + } + } + + public static <InputT> StatisticsOrRecord<InputT> reuseStatistics( + StatisticsOrRecord<InputT> reuse, TypeSerializer<DataStatistics> statisticsSerializer) { + if (reuse.hasStatistics()) { + return reuse; + } else { + // not reusable + return StatisticsOrRecord.fromStatistics(statisticsSerializer.createInstance()); + } + } + + boolean hasStatistics() { Review Comment: nit: rename to `isStatistics()` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecord.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.annotation.Internal; + +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.Objects; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** + * Either a record or a statistics. + * + * @param <InputT> + */ +@Internal +public class StatisticsOrRecord<InputT> { + + private DataStatistics statistics; + private InputT record; + + private StatisticsOrRecord(DataStatistics statistics, InputT record) { + checkArgument( + record != null ^ statistics != null, + "DataStatistics or record, not neither or both"); + this.statistics = statistics; + this.record = record; + } + + public static <InputT> StatisticsOrRecord<InputT> fromRecord(InputT record) { + return new StatisticsOrRecord<>(null, record); + } + + public static <InputT> StatisticsOrRecord<InputT> fromStatistics(DataStatistics statistics) { + return new StatisticsOrRecord<>(statistics, null); + } + + public static <InputT> StatisticsOrRecord<InputT> reuseRecord( + StatisticsOrRecord<InputT> reuse, TypeSerializer<InputT> recordSerializer) { + if (reuse.hasRecord()) { + return reuse; + } else { + // not reusable + return StatisticsOrRecord.fromRecord(recordSerializer.createInstance()); + } + } + + public static <InputT> StatisticsOrRecord<InputT> reuseStatistics( + StatisticsOrRecord<InputT> reuse, TypeSerializer<DataStatistics> statisticsSerializer) { + if (reuse.hasStatistics()) { + return reuse; + } else { + // not reusable + return StatisticsOrRecord.fromStatistics(statisticsSerializer.createInstance()); + } + } + + boolean hasStatistics() { + return statistics != null; + } + + public boolean hasRecord() { + return record != null; + } + + public DataStatistics statistics() { + return statistics; + } + + public void statistics(DataStatistics newStatistics) { Review Comment: rename to `setStatistics` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java: ########## @@ -123,20 +134,76 @@ public AppendSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) { @Override public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) { - // For append only sink, we will do bucket shuffle only if bucket keys are not empty. - if (!bucketKeys.isEmpty() && shuffleByBucketId) { - return partition( - input, - new FlinkRowDataChannelComputer<>( - toFlussRowType(tableRowType), - bucketKeys, - partitionKeys, - lakeFormat, - numBucket, - flussSerializationSchema), - input.getParallelism()); - } else { - return input; + switch (shuffleMode) { + case BUCKET_SHUFFLE: + if (!bucketKeys.isEmpty()) { + return partition( + input, + new FlinkRowDataChannelComputer<>( + toFlussRowType(tableRowType), + bucketKeys, + partitionKeys, + lakeFormat, + numBucket, + flussSerializationSchema), + input.getParallelism()); + } + return input; + case NONE: Review Comment: Move `NONE` to first ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java: ########## @@ -123,20 +134,76 @@ public AppendSinkWriter<InputT> createWriter(MailboxExecutor mailboxExecutor) { @Override public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) { - // For append only sink, we will do bucket shuffle only if bucket keys are not empty. - if (!bucketKeys.isEmpty() && shuffleByBucketId) { - return partition( - input, - new FlinkRowDataChannelComputer<>( - toFlussRowType(tableRowType), - bucketKeys, - partitionKeys, - lakeFormat, - numBucket, - flussSerializationSchema), - input.getParallelism()); - } else { - return input; + switch (shuffleMode) { + case BUCKET_SHUFFLE: + if (!bucketKeys.isEmpty()) { + return partition( + input, + new FlinkRowDataChannelComputer<>( + toFlussRowType(tableRowType), + bucketKeys, + partitionKeys, + lakeFormat, + numBucket, + flussSerializationSchema), + input.getParallelism()); + } + return input; + case NONE: + return input; + case DYNAMIC_SHUFFLE: + if (partitionKeys.isEmpty()) { + throw new UnsupportedOperationException( + "DYNAMIC_SHUFFLE is only supported for partition tables"); + } + + if (rowTypeInformation == null) { + throw new UnsupportedOperationException( + "RowTypeInformation is required for DYNAMIC_SHUFFLE mode."); + } + TypeInformation<StatisticsOrRecord<InputT>> statisticsOrRecordTypeInformation = + new StatisticsOrRecordTypeInformation<>(rowTypeInformation); + SingleOutputStreamOperator<StatisticsOrRecord<InputT>> shuffleStream = + input.transform( + "Dynamic shuffle data statistics", + statisticsOrRecordTypeInformation, + new DataStatisticsOperatorFactory<>( + toFlussRowType(tableRowType), + partitionKeys, + flussSerializationSchema)) + .uid("Dynamic shuffle data statistics" + tablePath) + // Set the parallelism same as input operator to encourage + // chaining + .setParallelism(input.getParallelism()); + + return partition( + shuffleStream, + new StatisticsOrRecordChannelComputer<>( + toFlussRowType(tableRowType), + bucketKeys, + partitionKeys, + numBucket, + lakeFormat, + flussSerializationSchema), + input.getParallelism()) + .flatMap( + (FlatMapFunction<StatisticsOrRecord<InputT>, InputT>) + (statisticsOrRecord, out) -> { + if (statisticsOrRecord.hasRecord()) { + out.collect(statisticsOrRecord.record()); + } + }) + .uid("flat map" + tablePath) Review Comment: ```suggestion .uid("flat map " + tablePath) ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/WeightedRandomAssignment.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.row.InternalRow; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Random; + +import static org.apache.fluss.utils.Preconditions.checkArgument; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * Partition assignment strategy that randomly distributes records to subtasks based on configured + * weights. + * + * <p>This assignment strategy enables weighted random distribution of records to subtasks, allowing + * for more balanced load distribution across downstream subtasks. The assignment uses a weighted + * random algorithm where subtasks with higher weights have a proportionally higher probability of + * being selected. + * + * <p>NOTE: This class is inspired from Iceberg project. + */ +@Internal +public class WeightedRandomAssignment implements PartitionAssignment { + protected final List<Integer> assignedSubtasks; + protected final List<Long> subtaskWeights; + protected final long keyWeight; + protected final double[] cumulativeWeights; + private final Random random; + + /** + * @param assignedSubtasks assigned subtasks for this key. It could be a single subtask. It + * could also be multiple subtasks if the key has heavy weight that should be handled by + * multiple subtasks. + * @param subtaskWeights assigned weight for each subtask. E.g., if the keyWeight is 27 and the + * key is assigned to 3 subtasks, subtaskWeights could contain values as [10, 10, 7] for + * target weight of 10 per subtask. + */ + WeightedRandomAssignment( + List<Integer> assignedSubtasks, List<Long> subtaskWeights, Random random) { + checkArgument( + assignedSubtasks != null && !assignedSubtasks.isEmpty(), + "Invalid assigned subtasks: null or empty"); + checkArgument( + subtaskWeights != null && !subtaskWeights.isEmpty(), + "Invalid assigned subtasks weights: null or empty"); + checkArgument( + assignedSubtasks.size() == subtaskWeights.size(), + "Invalid assignment: size mismatch (tasks length = %s, weights length = %s)", + assignedSubtasks.size(), + subtaskWeights.size()); + + this.assignedSubtasks = assignedSubtasks; + this.subtaskWeights = subtaskWeights; + this.keyWeight = subtaskWeights.stream().mapToLong(Long::longValue).sum(); + this.cumulativeWeights = new double[subtaskWeights.size()]; + long cumulativeWeight = 0; + for (int i = 0; i < subtaskWeights.size(); ++i) { + cumulativeWeight += subtaskWeights.get(i); + cumulativeWeights[i] = cumulativeWeight; + } + this.random = random; + } + + /** + * Select a subtask for the key. If bucket key is existed , same key will be assigned to the + * same subtask. + * + * @return subtask id + */ + @Override + public int select(InternalRow row) { + if (assignedSubtasks.size() == 1) { + // only choice. no need to run random number generator. + return assignedSubtasks.get(0); + } else { + double randomNumber = nextDouble(0, keyWeight); + int index = Arrays.binarySearch(cumulativeWeights, randomNumber); + // choose the subtask where randomNumber < cumulativeWeights[pos]. + // this works regardless whether index is negative or not. + int position = Math.abs(index + 1); + checkState( + position < assignedSubtasks.size(), + "Invalid selected position: out of range. key weight = %s, random number = %s, cumulative weights array = %s", + keyWeight, + randomNumber, + Arrays.toString(cumulativeWeights)); Review Comment: Use `if` condition, otherwise, the `Arrays.toString(cumulativeWeights)` is always executed. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecord.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.annotation.Internal; + +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.Objects; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** + * Either a record or a statistics. + * + * @param <InputT> + */ +@Internal +public class StatisticsOrRecord<InputT> { + + private DataStatistics statistics; + private InputT record; + + private StatisticsOrRecord(DataStatistics statistics, InputT record) { + checkArgument( + record != null ^ statistics != null, + "DataStatistics or record, not neither or both"); + this.statistics = statistics; + this.record = record; + } + + public static <InputT> StatisticsOrRecord<InputT> fromRecord(InputT record) { + return new StatisticsOrRecord<>(null, record); + } + + public static <InputT> StatisticsOrRecord<InputT> fromStatistics(DataStatistics statistics) { + return new StatisticsOrRecord<>(statistics, null); + } + + public static <InputT> StatisticsOrRecord<InputT> reuseRecord( + StatisticsOrRecord<InputT> reuse, TypeSerializer<InputT> recordSerializer) { + if (reuse.hasRecord()) { + return reuse; + } else { + // not reusable + return StatisticsOrRecord.fromRecord(recordSerializer.createInstance()); + } + } + + public static <InputT> StatisticsOrRecord<InputT> reuseStatistics( + StatisticsOrRecord<InputT> reuse, TypeSerializer<DataStatistics> statisticsSerializer) { + if (reuse.hasStatistics()) { + return reuse; + } else { + // not reusable + return StatisticsOrRecord.fromStatistics(statisticsSerializer.createInstance()); + } + } + + boolean hasStatistics() { + return statistics != null; + } + + public boolean hasRecord() { + return record != null; + } + + public DataStatistics statistics() { + return statistics; + } + + public void statistics(DataStatistics newStatistics) { + this.statistics = newStatistics; + } + + public InputT record() { + return record; + } + + public void record(InputT newRecord) { Review Comment: rename to `setRecord`. ########## fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinatorTest.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.flink.sink.serializer.RowDataSerializationSchema; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraphID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.table.data.RowData; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.apache.fluss.flink.sink.shuffle.StatisticsEvent.createStatisticsEvent; +import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link DataStatisticsCoordinator}. */ +public class DataStatisticsCoordinatorTest { + private static final DataStatisticsSerializer dataStatisticsSerializer = + new DataStatisticsSerializer(); + + @Test + void testStatisticsEvent() throws Exception { + try (DataStatisticsCoordinator coordinator = getDataStatisticsCoordinator()) { + coordinator.start(); + DataStatisticsCoordinator.SubtaskGateways subtaskGateways = + coordinator.getSubtaskGateways(); + coordinator.executionAttemptReady(0, 1, new MockGateway(0, 1)); + coordinator.executionAttemptReady(1, 1, new MockGateway(1, 1)); + coordinator.handleEventFromOperator( + 0, + 1, + createStatisticsEvent( + 0, + new DataStatistics(Collections.singletonMap("partition 1", 10000L)), + dataStatisticsSerializer)); + coordinator.handleEventFromOperator( + 1, + 1, + createStatisticsEvent( + 1, + new DataStatistics(Collections.singletonMap("partition 1", 20000L)), + dataStatisticsSerializer)); + coordinator.callInCoordinatorThread( + () -> null, "wait until all pending tasks in coordinator are finished "); + assertThat(((MockGateway) subtaskGateways.getSubtaskGateway(0)).events).isEmpty(); + assertThat(((MockGateway) subtaskGateways.getSubtaskGateway(1)).events).isEmpty(); + + Map<String, Long> partitionFrequencies = new HashMap<>(); + partitionFrequencies.put("partition 1", 10000L); + partitionFrequencies.put("partition 2", 20000L); + coordinator.handleEventFromOperator( + 0, + 1, + createStatisticsEvent( + 1, new DataStatistics(partitionFrequencies), dataStatisticsSerializer)); + + Thread.sleep(1000); Review Comment: Do we need to sleep here? It seems the following `coordinator.callInCoordinatorThread` has already waited for the pending events been processed. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java: ########## @@ -125,6 +126,23 @@ public class FlinkConnectorOptions { + BUCKET_KEY.key() + "' is defined. For Primary Key table, it is enabled by default."); + public static final ConfigOption<DistributionMode> SINK_DISTRIBUTION_MODE = + ConfigOptions.key("sink.distribution-mode") + .enumType(DistributionMode.class) + .defaultValue(DistributionMode.BUCKET_SHUFFLE) + .withDescription( + "Defines the distribution mode for writing data to the sink. Available options are: \n" + + "- NONE: No specific distribution strategy. Data is forwarded as is.\n" + + "- BUCKET_SHUFFLE: Shuffle data by bucket ID before writing to sink. " + + "Shuffling the data with the same bucket ID to be processed by the same task " + + "can improve the efficiency of client processing and reduce resource consumption. " + + "For Log Table, bucket shuffle will only take effect when the '" + + BUCKET_KEY.key() + + "' is defined. For Primary Key table, it is enabled by default.\n" + + "- DYNAMIC_SHUFFLE: Dynamically adjust shuffle strategy based on partition key traffic patterns. " + + "This mode monitors data distribution and adjusts the shuffle behavior to balance the load. " Review Comment: Mention when DYNAMIC should be used, and what is the cost. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java: ########## @@ -204,8 +272,13 @@ public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) { lakeFormat, numBucket, flussSerializationSchema), - input.getParallelism()) - : input; + input.getParallelism()); + case NONE: Review Comment: move `NONE` to first ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java: ########## @@ -125,6 +126,23 @@ public class FlinkConnectorOptions { + BUCKET_KEY.key() + "' is defined. For Primary Key table, it is enabled by default."); + public static final ConfigOption<DistributionMode> SINK_DISTRIBUTION_MODE = + ConfigOptions.key("sink.distribution-mode") + .enumType(DistributionMode.class) + .defaultValue(DistributionMode.BUCKET_SHUFFLE) + .withDescription( + "Defines the distribution mode for writing data to the sink. Available options are: \n" + + "- NONE: No specific distribution strategy. Data is forwarded as is.\n" + + "- BUCKET_SHUFFLE: Shuffle data by bucket ID before writing to sink. " + + "Shuffling the data with the same bucket ID to be processed by the same task " + + "can improve the efficiency of client processing and reduce resource consumption. " Review Comment: Mention this can improve throughput for primary key tables. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkTableSink.java: ########## @@ -79,7 +85,7 @@ public class FlinkTableSink private final DeleteBehavior tableDeleteBehavior; private final int numBucket; private final List<String> bucketKeys; - private final boolean shuffleByBucketId; + private DistributionMode distributionMode; Review Comment: `final` ########## fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinatorTest.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import org.apache.fluss.flink.sink.serializer.RowDataSerializationSchema; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraphID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.table.data.RowData; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.apache.fluss.flink.sink.shuffle.StatisticsEvent.createStatisticsEvent; +import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link DataStatisticsCoordinator}. */ +public class DataStatisticsCoordinatorTest { + private static final DataStatisticsSerializer dataStatisticsSerializer = + new DataStatisticsSerializer(); + + @Test + void testStatisticsEvent() throws Exception { + try (DataStatisticsCoordinator coordinator = getDataStatisticsCoordinator()) { + coordinator.start(); + DataStatisticsCoordinator.SubtaskGateways subtaskGateways = + coordinator.getSubtaskGateways(); + coordinator.executionAttemptReady(0, 1, new MockGateway(0, 1)); + coordinator.executionAttemptReady(1, 1, new MockGateway(1, 1)); + coordinator.handleEventFromOperator( + 0, + 1, + createStatisticsEvent( + 0, + new DataStatistics(Collections.singletonMap("partition 1", 10000L)), + dataStatisticsSerializer)); + coordinator.handleEventFromOperator( + 1, + 1, + createStatisticsEvent( + 1, + new DataStatistics(Collections.singletonMap("partition 1", 20000L)), + dataStatisticsSerializer)); + coordinator.callInCoordinatorThread( + () -> null, "wait until all pending tasks in coordinator are finished "); + assertThat(((MockGateway) subtaskGateways.getSubtaskGateway(0)).events).isEmpty(); + assertThat(((MockGateway) subtaskGateways.getSubtaskGateway(1)).events).isEmpty(); + + Map<String, Long> partitionFrequencies = new HashMap<>(); + partitionFrequencies.put("partition 1", 10000L); + partitionFrequencies.put("partition 2", 20000L); + coordinator.handleEventFromOperator( + 0, + 1, + createStatisticsEvent( + 1, new DataStatistics(partitionFrequencies), dataStatisticsSerializer)); + + Thread.sleep(1000); + Map<String, Long> expectedPartitionFrequencies = new HashMap<>(); + expectedPartitionFrequencies.put("partition 1", 30000L); + expectedPartitionFrequencies.put("partition 2", 20000L); + StatisticsEvent expectedStatisticsEvent = + createStatisticsEvent( + 1, + new DataStatistics(expectedPartitionFrequencies), + dataStatisticsSerializer); + + coordinator.callInCoordinatorThread( Review Comment: This is a little hack that the introduced `callInCoordinatorThread` is just for testing, and the semantic is different from `runInCoordinatorThread` (`callInCoordinatorThread` will block until the callable is finished), but by name, they look similar. I suggest not introduce `callInCoordinatorThread`, and just use `runInCoordinatorThread` to execute a `CompletableFuture.complte(null)`, and wait the `CompletableFuture.get()` in test. ########## fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java: ########## @@ -351,6 +363,59 @@ void testAppendLogWithMultiBatch() throws Exception { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + @ParameterizedTest + @EnumSource(value = DistributionMode.class) + void testAppendLogPartitionTable(DistributionMode distributionMode) throws Exception { + tEnv.executeSql( + String.format( + "create table sink_test (a int not null, b bigint, c string) " + + " partitioned by (c) " + + "with ('bucket.num' = '3', 'sink.distribution-mode'= '%s')", + distributionMode)); + String insertSql = + "INSERT INTO sink_test(a, b, c) " + + "VALUES (1, 3501, 'Tim'), " + + "(2, 3502, 'Fabian'), " + + "(3, 3503, 'Tim'), " + + "(4, 3504, 'jerry'), " + + "(5, 3505, 'piggy'), " + + "(7, 3507, 'Fabian'), " + + "(8, 3508, 'stave'), " + + "(9, 3509, 'Tim'), " + + "(10, 3510, 'coco'), " + + "(11, 3511, 'stave'), " + + "(12, 3512, 'Tim')"; + String insertPlan = tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN); + if (distributionMode == DistributionMode.DYNAMIC_SHUFFLE) { + assertThat(insertPlan) + .contains(String.format("\"ship_strategy\" : \"%s\"", distributionMode.name())); + } else { + assertThat(insertPlan).contains("\"ship_strategy\" : \"FORWARD\""); + } + tEnv.executeSql(insertSql).await(); + + CloseableIterator<Row> rowIter = tEnv.executeSql("select * from sink_test").collect(); + //noinspection ArraysAsListWithZeroOrOneArgument + List<List<String>> expectedGroups = + Arrays.asList( + Arrays.asList( + "+I[1, 3501, Tim]", + "+I[3, 3503, Tim]", + "+I[9, 3509, Tim]", + "+I[12, 3512, Tim]"), + Arrays.asList("+I[2, 3502, Fabian]", "+I[7, 3507, Fabian]"), + Arrays.asList("+I[4, 3504, jerry]"), + Arrays.asList("+I[5, 3505, piggy]"), + Arrays.asList("+I[8, 3508, stave]", "+I[11, 3511, stave]"), + Arrays.asList("+I[10, 3510, coco]")); + + List<String> expectedRows = + expectedGroups.stream().flatMap(List::stream).collect(Collectors.toList()); + + List<String> actual = collectRowsWithTimeout(rowIter, expectedRows.size()); + assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRows); + } + Review Comment: Please update the `testPut(boolean sinkBucketShuffle)` to use distributed mode. ########## fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java: ########## @@ -351,6 +363,59 @@ void testAppendLogWithMultiBatch() throws Exception { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + @ParameterizedTest + @EnumSource(value = DistributionMode.class) + void testAppendLogPartitionTable(DistributionMode distributionMode) throws Exception { + tEnv.executeSql( + String.format( + "create table sink_test (a int not null, b bigint, c string) " + + " partitioned by (c) " + + "with ('bucket.num' = '3', 'sink.distribution-mode'= '%s')", + distributionMode)); + String insertSql = + "INSERT INTO sink_test(a, b, c) " + + "VALUES (1, 3501, 'Tim'), " + + "(2, 3502, 'Fabian'), " + + "(3, 3503, 'Tim'), " + + "(4, 3504, 'jerry'), " + + "(5, 3505, 'piggy'), " + + "(7, 3507, 'Fabian'), " + + "(8, 3508, 'stave'), " + + "(9, 3509, 'Tim'), " + + "(10, 3510, 'coco'), " + + "(11, 3511, 'stave'), " + + "(12, 3512, 'Tim')"; + String insertPlan = tEnv.explainSql(insertSql, ExplainDetail.JSON_EXECUTION_PLAN); + if (distributionMode == DistributionMode.DYNAMIC_SHUFFLE) { + assertThat(insertPlan) + .contains(String.format("\"ship_strategy\" : \"%s\"", distributionMode.name())); + } else { Review Comment: We should throw exception when it's BUCKET shuffle, as the table doesn't have bucket key. (after introduce AUTO default mode) ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import java.util.Locale; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** Distribution mode for sink shuffling. */ +public enum DistributionMode { + NONE("NONE"), + BUCKET_SHUFFLE("BUCKET_SHUFFLE"), + DYNAMIC_SHUFFLE("DYNAMIC_SHUFFLE"); + + private final String modeName; + + DistributionMode(String modeName) { + this.modeName = modeName; + } + + public String modeName() { Review Comment: we don't need the `modeName`, use `name()` directly. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java: ########## @@ -125,6 +126,23 @@ public class FlinkConnectorOptions { + BUCKET_KEY.key() + "' is defined. For Primary Key table, it is enabled by default."); Review Comment: Add deprecation annotation and comments to the config option ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java: ########## @@ -125,6 +126,23 @@ public class FlinkConnectorOptions { + BUCKET_KEY.key() + "' is defined. For Primary Key table, it is enabled by default."); + public static final ConfigOption<DistributionMode> SINK_DISTRIBUTION_MODE = + ConfigOptions.key("sink.distribution-mode") + .enumType(DistributionMode.class) + .defaultValue(DistributionMode.BUCKET_SHUFFLE) Review Comment: `sink.distribution-mode` - Default: AUTO - AUTO: Automatically chooses the best mode based on the table type. Uses BUCKET mode for Primary Key Tables and NONE for Log Tables to maximize throughput. - NONE: Direct write without reshuffling. - BUCKET: .. - PARTITION_DYNAMIC: ... ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/FlussSerializationSchema.java: ########## @@ -51,6 +51,10 @@ public interface FlussSerializationSchema<T> extends Serializable { */ RowWithOp serialize(T value) throws Exception; + default long size(T value, RowType rowType) { Review Comment: This method is a little hack to me, and it's very confusing what's the relationship and order between this method and the `serialize` method. I suggest to add a `getEstimatedSizeInBytes` method to `RowWithOp` class, and add an additional construct that accepts `estimatedSizeInBytes` parameter. So we can combine the size estimation into the `serialize` method. ``` @Nuulable public Long getEstimatedSizeInBytes() { ... } ``` ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DistributionMode.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.sink.shuffle; + +import java.util.Locale; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** Distribution mode for sink shuffling. */ +public enum DistributionMode { + NONE("NONE"), + BUCKET_SHUFFLE("BUCKET_SHUFFLE"), + DYNAMIC_SHUFFLE("DYNAMIC_SHUFFLE"); + + private final String modeName; + + DistributionMode(String modeName) { + this.modeName = modeName; + } + + public String modeName() { + return modeName; + } + + public static DistributionMode fromName(String modeName) { + checkArgument(null != modeName, "Invalid distribution mode: null"); + try { + return DistributionMode.valueOf(modeName.toUpperCase(Locale.ENGLISH)); Review Comment: nit: use `.toUpperCase()` is enough? I mean all the other places just use `.toUpperCase()`. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java: ########## @@ -159,6 +168,66 @@ public RowWithOp serialize(RowData value) throws Exception { return new RowWithOp(row, opType); } + @Override + public long size(RowData value, RowType rowType) { + if (value instanceof BinaryFormat) { + return ((BinaryFormat) value).getSizeInBytes(); + } + + long size = 0; + for (int i = 0; i < rowType.getFieldCount(); i++) { Review Comment: We should pre-compute fixed-length part size for better performance, and only need to compute var-length parts during runtime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
