wuchong commented on code in PR #1784:
URL: https://github.com/apache/fluss/pull/1784#discussion_r2700779379


##########
fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java:
##########
@@ -17,5 +17,25 @@
 
 package org.apache.fluss.flink.sink;
 
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
 /** IT case for {@link FlinkTableSink} in Flink 1.18. */
-public class Flink118TableSinkITCase extends FlinkTableSinkITCase {}
+public class Flink118TableSinkITCase extends FlinkTableSinkITCase {
+    @BeforeEach
+    @Override
+    void before() throws Exception {
+        // invoke here because the AbstractTestBase in 1.18 is junit 4.
+        AbstractTestBase.MINI_CLUSTER_RESOURCE.before();
+        super.before();
+    }
+
+    @AfterEach
+    @Override
+    void after() throws Exception {
+        super.after();
+        // invoke here because the AbstractTestBase in 1.18 is junit 4.
+        AbstractTestBase.MINI_CLUSTER_RESOURCE.after();
+    }

Review Comment:
   I don't understand this. Why we don't need this in other IT Cases? like 
`Flink118ComplexTypeITCase`, `Flink118TableSourceITCase`, etc. They all extends 
`AbstractTestBase`.
   
   What's the problem if we don't add this? 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.shaded.guava32.com.google.common.collect.Iterables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Coordinator for collecting global data statistics.
+ *
+ * <p>NOTE: This class is inspired from Iceberg project.
+ */
+@Internal
+class DataStatisticsCoordinator implements OperatorCoordinator {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+    private final String operatorName;
+    private final OperatorCoordinator.Context context;
+    private final ExecutorService coordinatorExecutor;
+    private final SubtaskGateways subtaskGateways;
+    private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
+    private final TypeSerializer<DataStatistics> statisticsSerializer;
+
+    private transient boolean started;
+    private transient AggregatedStatisticsTracker aggregatedStatisticsTracker;
+
+    DataStatisticsCoordinator(String operatorName, OperatorCoordinator.Context 
context) {
+        this.operatorName = operatorName;
+        this.context = context;
+        this.coordinatorThreadFactory =
+                new CoordinatorExecutorThreadFactory(
+                        "DataStatisticsCoordinator-" + operatorName,
+                        context.getUserCodeClassloader());
+        this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+        this.subtaskGateways = new SubtaskGateways(operatorName, 
context.currentParallelism());
+        this.statisticsSerializer = new DataStatisticsSerializer();
+    }
+
+    @Override
+    public void start() throws Exception {
+        LOG.debug("Starting data statistics coordinator: {}.", operatorName);
+        this.started = true;
+
+        // statistics are restored already in resetToCheckpoint() before 
start() called
+        this.aggregatedStatisticsTracker =
+                new AggregatedStatisticsTracker(operatorName, 
context.currentParallelism());
+    }
+
+    @Override
+    public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+        runInCoordinatorThread(
+                () -> {
+                    LOG.debug(
+                            "Handling event from subtask {} (#{}) of {}: {}",
+                            subtask,
+                            attemptNumber,
+                            operatorName,
+                            event);
+                    if (event instanceof StatisticsEvent) {
+                        handleDataStatisticRequest(subtask, ((StatisticsEvent) 
event));
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Invalid operator event type: "
+                                        + event.getClass().getCanonicalName());
+                    }
+                },
+                String.format(
+                        Locale.ROOT,
+                        "handling operator event %s from subtask %d (#%d)",
+                        event.getClass(),
+                        subtask,
+                        attemptNumber));
+    }
+
+    @Override
+    public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture)
+            throws Exception {
+        resultFuture.complete(new byte[0]);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {}
+
+    @Override
+    public void resetToCheckpoint(long checkpointId, byte[] checkpointData) {
+        checkState(
+                !started,
+                "The coordinator %s can only be reset if it was not yet 
started",
+                operatorName);
+    }
+
+    @Override
+    public void subtaskReset(int subtask, long checkpointId) {
+        runInCoordinatorThread(
+                () -> {
+                    LOG.info(
+                            "Operator {} subtask {} is reset to checkpoint {}",
+                            operatorName,
+                            subtask,
+                            checkpointId);
+                    
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+                    subtaskGateways.reset(subtask);
+                },
+                String.format(
+                        Locale.ROOT,
+                        "handling subtask %d recovery to checkpoint %d",
+                        subtask,
+                        checkpointId));
+    }
+
+    @Override
+    public void executionAttemptFailed(int subtask, int attemptNumber, 
@Nullable Throwable reason) {
+        runInCoordinatorThread(
+                () -> {
+                    LOG.info(
+                            "Unregistering gateway after failure for subtask 
{} (#{}) of data statistics {}",
+                            subtask,
+                            attemptNumber,
+                            operatorName);
+                    
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+                    subtaskGateways.unregisterSubtaskGateway(subtask, 
attemptNumber);
+                },
+                String.format(
+                        Locale.ROOT, "handling subtask %d (#%d) failure", 
subtask, attemptNumber));
+    }
+
+    @Override
+    public void executionAttemptReady(int subtask, int attemptNumber, 
SubtaskGateway gateway) {
+        checkArgument(subtask == gateway.getSubtask());
+        checkArgument(attemptNumber == 
gateway.getExecution().getAttemptNumber());
+        runInCoordinatorThread(
+                () -> {
+                    
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+                    subtaskGateways.registerSubtaskGateway(gateway);
+                },
+                String.format(
+                        Locale.ROOT,
+                        "making event gateway to subtask %d (#%d) available",
+                        subtask,
+                        attemptNumber));
+    }
+
+    @Override
+    public void close() throws Exception {
+        coordinatorExecutor.shutdown();
+        this.aggregatedStatisticsTracker = null;
+        this.started = false;
+        LOG.info("Closed data statistics coordinator: {}.", operatorName);
+    }
+
+    private void runInCoordinatorThread(Runnable runnable) {
+        this.coordinatorExecutor.execute(
+                new ThrowableCatchingRunnable(
+                        throwable ->
+                                
this.coordinatorThreadFactory.uncaughtException(
+                                        Thread.currentThread(), throwable),
+                        runnable));
+    }
+
+    @VisibleForTesting
+    void runInCoordinatorThread(ThrowingRunnable<Throwable> action, String 
actionString) {
+        ensureStarted();
+        runInCoordinatorThread(
+                () -> {
+                    try {
+                        action.run();
+                    } catch (Throwable t) {
+                        ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                        LOG.error(
+                                "Uncaught exception in the data statistics 
coordinator: {} while {}. Triggering job failover",
+                                operatorName,
+                                actionString,
+                                t);
+                        context.failJob(t);
+                    }
+                });
+    }
+
+    private void ensureStarted() {
+        checkState(started, "The coordinator of %s has not started yet.", 
operatorName);
+    }
+
+    private void handleDataStatisticRequest(int subtask, StatisticsEvent 
event) {
+        DataStatistics maybeCompletedStatistics =
+                aggregatedStatisticsTracker.updateAndCheckCompletion(subtask, 
event);
+        if (maybeCompletedStatistics != null) {
+            if (maybeCompletedStatistics.isEmpty()) {
+                LOG.debug(
+                        "Skip aggregated statistics for checkpoint {} as it is 
empty.",
+                        event.getCheckpointId());
+            } else {
+                LOG.debug(
+                        "Completed statistics aggregation for checkpoint {}",
+                        event.getCheckpointId());
+                sendGlobalStatisticsToSubtasks(maybeCompletedStatistics, 
event.getCheckpointId());
+            }
+        }
+    }
+
+    private void sendGlobalStatisticsToSubtasks(DataStatistics statistics, 
long checkpointId) {
+        LOG.info(
+                "Broadcast latest global statistics from checkpoint {} to all 
subtasks",
+                checkpointId);
+        // applyImmediately is set to false so that operator subtasks can
+        // apply the change at checkpoint boundary
+        StatisticsEvent statisticsEvent =
+                StatisticsEvent.createStatisticsEvent(
+                        checkpointId, statistics, statisticsSerializer);
+        for (int i = 0; i < context.currentParallelism(); ++i) {
+            try {
+                
subtaskGateways.getSubtaskGateway(i).sendEvent(statisticsEvent).get();

Review Comment:
   This is an blocking IO and will introduce many problems (e.g., blocks 
checkpoint) when task number very large. Why not just call `sendEvent` and 
leave it as is? We don't need to make sure the RPC success as you mentioned in 
the following exception catch warning. 



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java:
##########
@@ -165,6 +166,7 @@ void testMetricsReport() throws Exception {
         createTable(tablePath, tableDescriptor);
 
         // test write
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);

Review Comment:
   Why setting paralleslims to `1`? This change doesn’t appear to be related to 
the main goal of the PR.
   
   To help reviewers understand the intent and avoid unnecessary 
back-and-forth, it would be great to add a brief comment explaining the reason, 
or consider removing it if it’s not essential. This will help speed up the 
review and merge process.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java:
##########
@@ -544,12 +625,13 @@ void testPartialUpsertDuringAddColumn() throws Exception {
 
     @Test
     void testFirstRowMergeEngine() throws Exception {
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);

Review Comment:
   Why setting paralleslims to 1? 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java:
##########
@@ -155,8 +174,81 @@ public RowWithOp serialize(RowData value) throws Exception 
{
             row = outputProjection.replaceRow(row);
         }
         OperationType opType = toOperationType(value.getRowKind());
+        long estimatedSizeInBytes = calculateSize(value);
+        return new RowWithOp(row, opType, estimatedSizeInBytes);

Review Comment:
   Currently, the `RowDataSerializationSchema` forces to calculate bytes in 
size for each record. However, only `DataStatisticsOperator` needs this. So 
this introduces perfromance regression.
   
   I added a `isStatisticEnabled` flag on 
`FlussSerializationSchema#InitializationContext` to allow the 
`RowDataSerializationSchema` skip collect statistics for most cases. 



##########
fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java:
##########
@@ -17,5 +17,25 @@
 
 package org.apache.fluss.flink.sink;
 
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
 /** IT case for {@link FlinkTableSink} in Flink 1.18. */
-public class Flink118TableSinkITCase extends FlinkTableSinkITCase {}
+public class Flink118TableSinkITCase extends FlinkTableSinkITCase {
+    @BeforeEach
+    @Override
+    void before() throws Exception {
+        // invoke here because the AbstractTestBase in 1.18 is junit 4.
+        AbstractTestBase.MINI_CLUSTER_RESOURCE.before();
+        super.before();
+    }
+
+    @AfterEach
+    @Override
+    void after() throws Exception {
+        super.after();
+        // invoke here because the AbstractTestBase in 1.18 is junit 4.
+        AbstractTestBase.MINI_CLUSTER_RESOURCE.after();
+    }

Review Comment:
   Is it because of the PARTITION_DYNAMIC jobs that require more task slots? 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -123,22 +130,80 @@ public AppendSinkWriter<InputT> 
createWriter(MailboxExecutor mailboxExecutor) {
 
         @Override
         public DataStream<InputT> addPreWriteTopology(DataStream<InputT> 
input) {
-            // For append only sink, we will do bucket shuffle only if bucket 
keys are not empty.
-            if (!bucketKeys.isEmpty() && shuffleByBucketId) {
-                return partition(
-                        input,
-                        new FlinkRowDataChannelComputer<>(
-                                toFlussRowType(tableRowType),
-                                bucketKeys,
-                                partitionKeys,
-                                lakeFormat,
-                                numBucket,
-                                flussSerializationSchema),
-                        input.getParallelism());
-            } else {
-                return input;
+            switch (shuffleMode) {
+                case NONE:
+                    return input;
+                case AUTO:
+                    // if bucket keys are not empty, use BUCKET as default. 
Otherwise, use NONE.
+                    return bucketKeys.isEmpty() ? input : bucketShuffle(input);
+                case BUCKET:
+                    if (!bucketKeys.isEmpty()) {
+                        return bucketShuffle(input);
+                    }
+                    throw new UnsupportedOperationException(
+                            "BUCKET mode is only supported for log tables with 
bucket keys");
+                case PARTITION_DYNAMIC:
+                    if (partitionKeys.isEmpty()) {
+                        throw new UnsupportedOperationException(
+                                "PARTITION_DYNAMIC is only supported for 
partition tables");
+                    }
+
+                    TypeInformation<StatisticsOrRecord<InputT>> 
statisticsOrRecordTypeInformation =
+                            new 
StatisticsOrRecordTypeInformation<>(input.getType());
+                    SingleOutputStreamOperator<StatisticsOrRecord<InputT>> 
shuffleStream =
+                            input.transform(
+                                            "Dynamic shuffle data statistics",
+                                            statisticsOrRecordTypeInformation,
+                                            new 
DataStatisticsOperatorFactory<>(
+                                                    
toFlussRowType(tableRowType),
+                                                    partitionKeys,
+                                                    flussSerializationSchema))
+                                    // Set the parallelism same as input 
operator to encourage
+                                    // chaining
+                                    .setParallelism(input.getParallelism());
+
+                    return partition(
+                                    shuffleStream,
+                                    new StatisticsOrRecordChannelComputer<>(
+                                            toFlussRowType(tableRowType),
+                                            bucketKeys,
+                                            partitionKeys,
+                                            numBucket,
+                                            lakeFormat,
+                                            flussSerializationSchema),
+                                    input.getParallelism())
+                            .flatMap(
+                                    
(FlatMapFunction<StatisticsOrRecord<InputT>, InputT>)
+                                            (statisticsOrRecord, out) -> {
+                                                if 
(statisticsOrRecord.isRecord()) {
+                                                    
out.collect(statisticsOrRecord.record());

Review Comment:
   Add operator name and add `setParallelism` and use input parallelism. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -62,9 +70,8 @@ protected SinkWriter<InputT> createWriter(
         return flinkSinkWriter;
     }
 
-    @Override
-    public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
-        return builder.addPreWriteTopology(input);
+    public DataStreamSink<InputT> addPreWriteTopology(DataStream<InputT> 
input) {
+        return builder.addPreWriteTopology(input).sinkTo(this);

Review Comment:
   add `setParallelism` and use input parallelism. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -123,22 +130,80 @@ public AppendSinkWriter<InputT> 
createWriter(MailboxExecutor mailboxExecutor) {
 
         @Override
         public DataStream<InputT> addPreWriteTopology(DataStream<InputT> 
input) {
-            // For append only sink, we will do bucket shuffle only if bucket 
keys are not empty.
-            if (!bucketKeys.isEmpty() && shuffleByBucketId) {
-                return partition(
-                        input,
-                        new FlinkRowDataChannelComputer<>(
-                                toFlussRowType(tableRowType),
-                                bucketKeys,
-                                partitionKeys,
-                                lakeFormat,
-                                numBucket,
-                                flussSerializationSchema),
-                        input.getParallelism());
-            } else {
-                return input;
+            switch (shuffleMode) {
+                case NONE:
+                    return input;
+                case AUTO:
+                    // if bucket keys are not empty, use BUCKET as default. 
Otherwise, use NONE.
+                    return bucketKeys.isEmpty() ? input : bucketShuffle(input);
+                case BUCKET:
+                    if (!bucketKeys.isEmpty()) {
+                        return bucketShuffle(input);
+                    }
+                    throw new UnsupportedOperationException(
+                            "BUCKET mode is only supported for log tables with 
bucket keys");
+                case PARTITION_DYNAMIC:
+                    if (partitionKeys.isEmpty()) {
+                        throw new UnsupportedOperationException(
+                                "PARTITION_DYNAMIC is only supported for 
partition tables");
+                    }
+
+                    TypeInformation<StatisticsOrRecord<InputT>> 
statisticsOrRecordTypeInformation =
+                            new 
StatisticsOrRecordTypeInformation<>(input.getType());
+                    SingleOutputStreamOperator<StatisticsOrRecord<InputT>> 
shuffleStream =
+                            input.transform(
+                                            "Dynamic shuffle data statistics",
+                                            statisticsOrRecordTypeInformation,
+                                            new 
DataStatisticsOperatorFactory<>(
+                                                    
toFlussRowType(tableRowType),
+                                                    partitionKeys,
+                                                    flussSerializationSchema))
+                                    // Set the parallelism same as input 
operator to encourage
+                                    // chaining
+                                    .setParallelism(input.getParallelism());
+
+                    return partition(
+                                    shuffleStream,
+                                    new StatisticsOrRecordChannelComputer<>(
+                                            toFlussRowType(tableRowType),
+                                            bucketKeys,
+                                            partitionKeys,
+                                            numBucket,
+                                            lakeFormat,
+                                            flussSerializationSchema),
+                                    input.getParallelism())
+                            .flatMap(
+                                    
(FlatMapFunction<StatisticsOrRecord<InputT>, InputT>)
+                                            (statisticsOrRecord, out) -> {
+                                                if 
(statisticsOrRecord.isRecord()) {
+                                                    
out.collect(statisticsOrRecord.record());
+                                                }
+                                            })
+                            // To promote operator chaining with the 
downstream writer operator,
+                            // setting slot sharing group and the parallelism 
as default, {@link
+                            // SinkTransformationTranslator} will set the 
parallelism same as sink
+                            // transformation.
+                            .slotSharingGroup("shuffle-partition-custom-group")

Review Comment:
   I’m not sure why `slotSharingGroup` is being set here. It doesn’t affect 
operator chaining.
   
   In fact, I suspect this `slotSharingGroup` configuration may be the root 
cause of scheduling failures in PARTITION_DYNAMIC jobs due to insufficient 
resources. When all operators share the same `slotSharingGroup`, Flink can (and 
will) pack them into the same TaskManager slot. However, if different operators 
are assigned different slot sharing groups, they must be placed in separate 
slots, thereby increasing total slot demand.
   
   When I removed this configuration, the PARTITION_DYNAMIC tests can be passed 
without `AbstractTestBase.MINI_CLUSTER_RESOURCE.before()` changes.



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java:
##########
@@ -261,22 +272,25 @@ void testAppendLogWithBucketKey(boolean 
sinkBucketShuffle) throws Exception {
         List<String> actual = collectRowsWithTimeout(rowIter, 
expectedRows.size());
         assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRows);
 
-        // check data with the same bucket key should be read in sequence.
-        for (List<String> expected : expectedGroups) {
-            if (expected.size() <= 1) {
-                continue;
-            }
-            int prevIndex = actual.indexOf(expected.get(0));
-            for (int i = 1; i < expected.size(); i++) {
-                int index = actual.indexOf(expected.get(i));
-                assertThat(index).isGreaterThan(prevIndex);
-                prevIndex = index;
+        if (distributionMode == DistributionMode.BUCKET) {
+            // check data with the same bucket key should be read in sequence.
+            for (List<String> expected : expectedGroups) {
+                if (expected.size() <= 1) {
+                    continue;
+                }
+                int prevIndex = actual.indexOf(expected.get(0));
+                for (int i = 1; i < expected.size(); i++) {
+                    int index = actual.indexOf(expected.get(i));
+                    assertThat(index).isGreaterThan(prevIndex);
+                    prevIndex = index;
+                }
             }
         }
     }
 
     @Test
     void testAppendLogWithRoundRobin() throws Exception {
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);

Review Comment:
   Why setting paralleslims to 1? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to