wuchong commented on code in PR #1784:
URL: https://github.com/apache/fluss/pull/1784#discussion_r2700779379
##########
fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java:
##########
@@ -17,5 +17,25 @@
package org.apache.fluss.flink.sink;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
/** IT case for {@link FlinkTableSink} in Flink 1.18. */
-public class Flink118TableSinkITCase extends FlinkTableSinkITCase {}
+public class Flink118TableSinkITCase extends FlinkTableSinkITCase {
+ @BeforeEach
+ @Override
+ void before() throws Exception {
+ // invoke here because the AbstractTestBase in 1.18 is junit 4.
+ AbstractTestBase.MINI_CLUSTER_RESOURCE.before();
+ super.before();
+ }
+
+ @AfterEach
+ @Override
+ void after() throws Exception {
+ super.after();
+ // invoke here because the AbstractTestBase in 1.18 is junit 4.
+ AbstractTestBase.MINI_CLUSTER_RESOURCE.after();
+ }
Review Comment:
I don't understand this. Why we don't need this in other IT Cases? like
`Flink118ComplexTypeITCase`, `Flink118TableSourceITCase`, etc. They all extends
`AbstractTestBase`.
What's the problem if we don't add this?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/DataStatisticsCoordinator.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink.shuffle;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.shaded.guava32.com.google.common.collect.Iterables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.flink.util.ThrowableCatchingRunnable;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Coordinator for collecting global data statistics.
+ *
+ * <p>NOTE: This class is inspired from Iceberg project.
+ */
+@Internal
+class DataStatisticsCoordinator implements OperatorCoordinator {
+ private static final Logger LOG =
LoggerFactory.getLogger(DataStatisticsCoordinator.class);
+
+ private final String operatorName;
+ private final OperatorCoordinator.Context context;
+ private final ExecutorService coordinatorExecutor;
+ private final SubtaskGateways subtaskGateways;
+ private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
+ private final TypeSerializer<DataStatistics> statisticsSerializer;
+
+ private transient boolean started;
+ private transient AggregatedStatisticsTracker aggregatedStatisticsTracker;
+
+ DataStatisticsCoordinator(String operatorName, OperatorCoordinator.Context
context) {
+ this.operatorName = operatorName;
+ this.context = context;
+ this.coordinatorThreadFactory =
+ new CoordinatorExecutorThreadFactory(
+ "DataStatisticsCoordinator-" + operatorName,
+ context.getUserCodeClassloader());
+ this.coordinatorExecutor =
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+ this.subtaskGateways = new SubtaskGateways(operatorName,
context.currentParallelism());
+ this.statisticsSerializer = new DataStatisticsSerializer();
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOG.debug("Starting data statistics coordinator: {}.", operatorName);
+ this.started = true;
+
+ // statistics are restored already in resetToCheckpoint() before
start() called
+ this.aggregatedStatisticsTracker =
+ new AggregatedStatisticsTracker(operatorName,
context.currentParallelism());
+ }
+
+ @Override
+ public void handleEventFromOperator(int subtask, int attemptNumber,
OperatorEvent event) {
+ runInCoordinatorThread(
+ () -> {
+ LOG.debug(
+ "Handling event from subtask {} (#{}) of {}: {}",
+ subtask,
+ attemptNumber,
+ operatorName,
+ event);
+ if (event instanceof StatisticsEvent) {
+ handleDataStatisticRequest(subtask, ((StatisticsEvent)
event));
+ } else {
+ throw new IllegalArgumentException(
+ "Invalid operator event type: "
+ + event.getClass().getCanonicalName());
+ }
+ },
+ String.format(
+ Locale.ROOT,
+ "handling operator event %s from subtask %d (#%d)",
+ event.getClass(),
+ subtask,
+ attemptNumber));
+ }
+
+ @Override
+ public void checkpointCoordinator(long checkpointId,
CompletableFuture<byte[]> resultFuture)
+ throws Exception {
+ resultFuture.complete(new byte[0]);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {}
+
+ @Override
+ public void resetToCheckpoint(long checkpointId, byte[] checkpointData) {
+ checkState(
+ !started,
+ "The coordinator %s can only be reset if it was not yet
started",
+ operatorName);
+ }
+
+ @Override
+ public void subtaskReset(int subtask, long checkpointId) {
+ runInCoordinatorThread(
+ () -> {
+ LOG.info(
+ "Operator {} subtask {} is reset to checkpoint {}",
+ operatorName,
+ subtask,
+ checkpointId);
+
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+ subtaskGateways.reset(subtask);
+ },
+ String.format(
+ Locale.ROOT,
+ "handling subtask %d recovery to checkpoint %d",
+ subtask,
+ checkpointId));
+ }
+
+ @Override
+ public void executionAttemptFailed(int subtask, int attemptNumber,
@Nullable Throwable reason) {
+ runInCoordinatorThread(
+ () -> {
+ LOG.info(
+ "Unregistering gateway after failure for subtask
{} (#{}) of data statistics {}",
+ subtask,
+ attemptNumber,
+ operatorName);
+
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+ subtaskGateways.unregisterSubtaskGateway(subtask,
attemptNumber);
+ },
+ String.format(
+ Locale.ROOT, "handling subtask %d (#%d) failure",
subtask, attemptNumber));
+ }
+
+ @Override
+ public void executionAttemptReady(int subtask, int attemptNumber,
SubtaskGateway gateway) {
+ checkArgument(subtask == gateway.getSubtask());
+ checkArgument(attemptNumber ==
gateway.getExecution().getAttemptNumber());
+ runInCoordinatorThread(
+ () -> {
+
checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+ subtaskGateways.registerSubtaskGateway(gateway);
+ },
+ String.format(
+ Locale.ROOT,
+ "making event gateway to subtask %d (#%d) available",
+ subtask,
+ attemptNumber));
+ }
+
+ @Override
+ public void close() throws Exception {
+ coordinatorExecutor.shutdown();
+ this.aggregatedStatisticsTracker = null;
+ this.started = false;
+ LOG.info("Closed data statistics coordinator: {}.", operatorName);
+ }
+
+ private void runInCoordinatorThread(Runnable runnable) {
+ this.coordinatorExecutor.execute(
+ new ThrowableCatchingRunnable(
+ throwable ->
+
this.coordinatorThreadFactory.uncaughtException(
+ Thread.currentThread(), throwable),
+ runnable));
+ }
+
+ @VisibleForTesting
+ void runInCoordinatorThread(ThrowingRunnable<Throwable> action, String
actionString) {
+ ensureStarted();
+ runInCoordinatorThread(
+ () -> {
+ try {
+ action.run();
+ } catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+ LOG.error(
+ "Uncaught exception in the data statistics
coordinator: {} while {}. Triggering job failover",
+ operatorName,
+ actionString,
+ t);
+ context.failJob(t);
+ }
+ });
+ }
+
+ private void ensureStarted() {
+ checkState(started, "The coordinator of %s has not started yet.",
operatorName);
+ }
+
+ private void handleDataStatisticRequest(int subtask, StatisticsEvent
event) {
+ DataStatistics maybeCompletedStatistics =
+ aggregatedStatisticsTracker.updateAndCheckCompletion(subtask,
event);
+ if (maybeCompletedStatistics != null) {
+ if (maybeCompletedStatistics.isEmpty()) {
+ LOG.debug(
+ "Skip aggregated statistics for checkpoint {} as it is
empty.",
+ event.getCheckpointId());
+ } else {
+ LOG.debug(
+ "Completed statistics aggregation for checkpoint {}",
+ event.getCheckpointId());
+ sendGlobalStatisticsToSubtasks(maybeCompletedStatistics,
event.getCheckpointId());
+ }
+ }
+ }
+
+ private void sendGlobalStatisticsToSubtasks(DataStatistics statistics,
long checkpointId) {
+ LOG.info(
+ "Broadcast latest global statistics from checkpoint {} to all
subtasks",
+ checkpointId);
+ // applyImmediately is set to false so that operator subtasks can
+ // apply the change at checkpoint boundary
+ StatisticsEvent statisticsEvent =
+ StatisticsEvent.createStatisticsEvent(
+ checkpointId, statistics, statisticsSerializer);
+ for (int i = 0; i < context.currentParallelism(); ++i) {
+ try {
+
subtaskGateways.getSubtaskGateway(i).sendEvent(statisticsEvent).get();
Review Comment:
This is an blocking IO and will introduce many problems (e.g., blocks
checkpoint) when task number very large. Why not just call `sendEvent` and
leave it as is? We don't need to make sure the RPC success as you mentioned in
the following exception catch warning.
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/metrics/FlinkMetricsITCase.java:
##########
@@ -165,6 +166,7 @@ void testMetricsReport() throws Exception {
createTable(tablePath, tableDescriptor);
// test write
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
1);
Review Comment:
Why setting paralleslims to `1`? This change doesn’t appear to be related to
the main goal of the PR.
To help reviewers understand the intent and avoid unnecessary
back-and-forth, it would be great to add a brief comment explaining the reason,
or consider removing it if it’s not essential. This will help speed up the
review and merge process.
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java:
##########
@@ -544,12 +625,13 @@ void testPartialUpsertDuringAddColumn() throws Exception {
@Test
void testFirstRowMergeEngine() throws Exception {
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
1);
Review Comment:
Why setting paralleslims to 1?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java:
##########
@@ -155,8 +174,81 @@ public RowWithOp serialize(RowData value) throws Exception
{
row = outputProjection.replaceRow(row);
}
OperationType opType = toOperationType(value.getRowKind());
+ long estimatedSizeInBytes = calculateSize(value);
+ return new RowWithOp(row, opType, estimatedSizeInBytes);
Review Comment:
Currently, the `RowDataSerializationSchema` forces to calculate bytes in
size for each record. However, only `DataStatisticsOperator` needs this. So
this introduces perfromance regression.
I added a `isStatisticEnabled` flag on
`FlussSerializationSchema#InitializationContext` to allow the
`RowDataSerializationSchema` skip collect statistics for most cases.
##########
fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118TableSinkITCase.java:
##########
@@ -17,5 +17,25 @@
package org.apache.fluss.flink.sink;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+
/** IT case for {@link FlinkTableSink} in Flink 1.18. */
-public class Flink118TableSinkITCase extends FlinkTableSinkITCase {}
+public class Flink118TableSinkITCase extends FlinkTableSinkITCase {
+ @BeforeEach
+ @Override
+ void before() throws Exception {
+ // invoke here because the AbstractTestBase in 1.18 is junit 4.
+ AbstractTestBase.MINI_CLUSTER_RESOURCE.before();
+ super.before();
+ }
+
+ @AfterEach
+ @Override
+ void after() throws Exception {
+ super.after();
+ // invoke here because the AbstractTestBase in 1.18 is junit 4.
+ AbstractTestBase.MINI_CLUSTER_RESOURCE.after();
+ }
Review Comment:
Is it because of the PARTITION_DYNAMIC jobs that require more task slots?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -123,22 +130,80 @@ public AppendSinkWriter<InputT>
createWriter(MailboxExecutor mailboxExecutor) {
@Override
public DataStream<InputT> addPreWriteTopology(DataStream<InputT>
input) {
- // For append only sink, we will do bucket shuffle only if bucket
keys are not empty.
- if (!bucketKeys.isEmpty() && shuffleByBucketId) {
- return partition(
- input,
- new FlinkRowDataChannelComputer<>(
- toFlussRowType(tableRowType),
- bucketKeys,
- partitionKeys,
- lakeFormat,
- numBucket,
- flussSerializationSchema),
- input.getParallelism());
- } else {
- return input;
+ switch (shuffleMode) {
+ case NONE:
+ return input;
+ case AUTO:
+ // if bucket keys are not empty, use BUCKET as default.
Otherwise, use NONE.
+ return bucketKeys.isEmpty() ? input : bucketShuffle(input);
+ case BUCKET:
+ if (!bucketKeys.isEmpty()) {
+ return bucketShuffle(input);
+ }
+ throw new UnsupportedOperationException(
+ "BUCKET mode is only supported for log tables with
bucket keys");
+ case PARTITION_DYNAMIC:
+ if (partitionKeys.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "PARTITION_DYNAMIC is only supported for
partition tables");
+ }
+
+ TypeInformation<StatisticsOrRecord<InputT>>
statisticsOrRecordTypeInformation =
+ new
StatisticsOrRecordTypeInformation<>(input.getType());
+ SingleOutputStreamOperator<StatisticsOrRecord<InputT>>
shuffleStream =
+ input.transform(
+ "Dynamic shuffle data statistics",
+ statisticsOrRecordTypeInformation,
+ new
DataStatisticsOperatorFactory<>(
+
toFlussRowType(tableRowType),
+ partitionKeys,
+ flussSerializationSchema))
+ // Set the parallelism same as input
operator to encourage
+ // chaining
+ .setParallelism(input.getParallelism());
+
+ return partition(
+ shuffleStream,
+ new StatisticsOrRecordChannelComputer<>(
+ toFlussRowType(tableRowType),
+ bucketKeys,
+ partitionKeys,
+ numBucket,
+ lakeFormat,
+ flussSerializationSchema),
+ input.getParallelism())
+ .flatMap(
+
(FlatMapFunction<StatisticsOrRecord<InputT>, InputT>)
+ (statisticsOrRecord, out) -> {
+ if
(statisticsOrRecord.isRecord()) {
+
out.collect(statisticsOrRecord.record());
Review Comment:
Add operator name and add `setParallelism` and use input parallelism.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -62,9 +70,8 @@ protected SinkWriter<InputT> createWriter(
return flinkSinkWriter;
}
- @Override
- public DataStream<InputT> addPreWriteTopology(DataStream<InputT> input) {
- return builder.addPreWriteTopology(input);
+ public DataStreamSink<InputT> addPreWriteTopology(DataStream<InputT>
input) {
+ return builder.addPreWriteTopology(input).sinkTo(this);
Review Comment:
add `setParallelism` and use input parallelism.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlinkSink.java:
##########
@@ -123,22 +130,80 @@ public AppendSinkWriter<InputT>
createWriter(MailboxExecutor mailboxExecutor) {
@Override
public DataStream<InputT> addPreWriteTopology(DataStream<InputT>
input) {
- // For append only sink, we will do bucket shuffle only if bucket
keys are not empty.
- if (!bucketKeys.isEmpty() && shuffleByBucketId) {
- return partition(
- input,
- new FlinkRowDataChannelComputer<>(
- toFlussRowType(tableRowType),
- bucketKeys,
- partitionKeys,
- lakeFormat,
- numBucket,
- flussSerializationSchema),
- input.getParallelism());
- } else {
- return input;
+ switch (shuffleMode) {
+ case NONE:
+ return input;
+ case AUTO:
+ // if bucket keys are not empty, use BUCKET as default.
Otherwise, use NONE.
+ return bucketKeys.isEmpty() ? input : bucketShuffle(input);
+ case BUCKET:
+ if (!bucketKeys.isEmpty()) {
+ return bucketShuffle(input);
+ }
+ throw new UnsupportedOperationException(
+ "BUCKET mode is only supported for log tables with
bucket keys");
+ case PARTITION_DYNAMIC:
+ if (partitionKeys.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "PARTITION_DYNAMIC is only supported for
partition tables");
+ }
+
+ TypeInformation<StatisticsOrRecord<InputT>>
statisticsOrRecordTypeInformation =
+ new
StatisticsOrRecordTypeInformation<>(input.getType());
+ SingleOutputStreamOperator<StatisticsOrRecord<InputT>>
shuffleStream =
+ input.transform(
+ "Dynamic shuffle data statistics",
+ statisticsOrRecordTypeInformation,
+ new
DataStatisticsOperatorFactory<>(
+
toFlussRowType(tableRowType),
+ partitionKeys,
+ flussSerializationSchema))
+ // Set the parallelism same as input
operator to encourage
+ // chaining
+ .setParallelism(input.getParallelism());
+
+ return partition(
+ shuffleStream,
+ new StatisticsOrRecordChannelComputer<>(
+ toFlussRowType(tableRowType),
+ bucketKeys,
+ partitionKeys,
+ numBucket,
+ lakeFormat,
+ flussSerializationSchema),
+ input.getParallelism())
+ .flatMap(
+
(FlatMapFunction<StatisticsOrRecord<InputT>, InputT>)
+ (statisticsOrRecord, out) -> {
+ if
(statisticsOrRecord.isRecord()) {
+
out.collect(statisticsOrRecord.record());
+ }
+ })
+ // To promote operator chaining with the
downstream writer operator,
+ // setting slot sharing group and the parallelism
as default, {@link
+ // SinkTransformationTranslator} will set the
parallelism same as sink
+ // transformation.
+ .slotSharingGroup("shuffle-partition-custom-group")
Review Comment:
I’m not sure why `slotSharingGroup` is being set here. It doesn’t affect
operator chaining.
In fact, I suspect this `slotSharingGroup` configuration may be the root
cause of scheduling failures in PARTITION_DYNAMIC jobs due to insufficient
resources. When all operators share the same `slotSharingGroup`, Flink can (and
will) pack them into the same TaskManager slot. However, if different operators
are assigned different slot sharing groups, they must be placed in separate
slots, thereby increasing total slot demand.
When I removed this configuration, the PARTITION_DYNAMIC tests can be passed
without `AbstractTestBase.MINI_CLUSTER_RESOURCE.before()` changes.
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java:
##########
@@ -261,22 +272,25 @@ void testAppendLogWithBucketKey(boolean
sinkBucketShuffle) throws Exception {
List<String> actual = collectRowsWithTimeout(rowIter,
expectedRows.size());
assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRows);
- // check data with the same bucket key should be read in sequence.
- for (List<String> expected : expectedGroups) {
- if (expected.size() <= 1) {
- continue;
- }
- int prevIndex = actual.indexOf(expected.get(0));
- for (int i = 1; i < expected.size(); i++) {
- int index = actual.indexOf(expected.get(i));
- assertThat(index).isGreaterThan(prevIndex);
- prevIndex = index;
+ if (distributionMode == DistributionMode.BUCKET) {
+ // check data with the same bucket key should be read in sequence.
+ for (List<String> expected : expectedGroups) {
+ if (expected.size() <= 1) {
+ continue;
+ }
+ int prevIndex = actual.indexOf(expected.get(0));
+ for (int i = 1; i < expected.size(); i++) {
+ int index = actual.indexOf(expected.get(i));
+ assertThat(index).isGreaterThan(prevIndex);
+ prevIndex = index;
+ }
}
}
}
@Test
void testAppendLogWithRoundRobin() throws Exception {
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
1);
Review Comment:
Why setting paralleslims to 1?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]