This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6d60f41a9b1369cd9af6fe0e0fb384f39b3166ff Author: Arvid Heise <[email protected]> AuthorDate: Sun Sep 15 22:30:50 2024 +0200 [FLINK-25920] Fix AbstractStreamingWriter sending after EOI AbstractStreamingWriter send partition info twice on EOI. This commit ensures that we are not resending partition information even after restarting from a final checkpoint. --- .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 36 +++++++------- .../file/table/stream/AbstractStreamingWriter.java | 56 ++++++++++++++++++++-- 2 files changed, 70 insertions(+), 22 deletions(-) diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e index 91d8cab4e67..b9d189c8fe6 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e @@ -560,29 +560,30 @@ Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.open Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (BatchFileWriter.java:116) Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<T>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (BatchFileWriter.java:0) Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (BatchFileWriter.java:0) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:106) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:125) Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has generic parameter type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket<IN, java.lang.String>> with type argument depending on <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0) Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:111) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:130) Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has generic parameter type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket<IN, java.lang.String>> with type argument depending on <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0) Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.close()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.close()> in (AbstractStreamingWriter.java:167) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.commitUpToCheckpoint(long)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.commitUpToCheckpoint(long)> in (AbstractStreamingWriter.java:90) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(long)> in (AbstractStreamingWriter.java:157) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:158) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.Buckets, boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in (AbstractStreamingWriter.java:122) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setBucketLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener)> in (AbstractStreamingWriter.java:101) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setFileLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener)> in (AbstractStreamingWriter.java:115) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getProcessingTimeService()> in (AbstractStreamingWriter.java:122) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (AbstractStreamingWriter.java:98) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(java.lang.Object, long, java.lang.Long, long)> in (AbstractStreamingWriter.java:142) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getTimestamp()> in (AbstractStreamingWriter.java:145) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (AbstractStreamingWriter.java:143) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.hasTimestamp()> in (AbstractStreamingWriter.java:145) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (AbstractStreamingWriter.java:144) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.close()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.close()> in (AbstractStreamingWriter.java:213) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.commitUpToCheckpoint(long)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.commitUpToCheckpoint(long)> in (AbstractStreamingWriter.java:109) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(long)> in (AbstractStreamingWriter.java:202) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:203) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.Buckets, boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in (AbstractStreamingWriter.java:141) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.shaded.guava32.com.google.common.collect.Lists.newArrayList(java.lang.Iterable)> in (AbstractStreamingWriter.java:164) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setBucketLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener)> in (AbstractStreamingWriter.java:120) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setFileLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener)> in (AbstractStreamingWriter.java:134) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getProcessingTimeService()> in (AbstractStreamingWriter.java:141) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (AbstractStreamingWriter.java:117) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(java.lang.Object, long, java.lang.Long, long)> in (AbstractStreamingWriter.java:183) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getTimestamp()> in (AbstractStreamingWriter.java:186) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (AbstractStreamingWriter.java:184) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.hasTimestamp()> in (AbstractStreamingWriter.java:186) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (AbstractStreamingWriter.java:185) Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IN>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (AbstractStreamingWriter.java:0) Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (AbstractStreamingWriter.java:0) -Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(org.apache.flink.runtime.state.StateSnapshotContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:131) +Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(org.apache.flink.runtime.state.StateSnapshotContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:171) Method <org.apache.flink.connector.file.table.stream.PartitionCommitTrigger.create(boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, java.util.List, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService)> has parameter of type <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService> in (PartitionCommitTrigger.java:0) Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.commitPartitions(long)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)> in (PartitionCommitter.java:167) Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.commitPartitions(long)> calls method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (PartitionCommitter.java:172) @@ -647,6 +648,7 @@ Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCo Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory.getInstance()> in (StandardDeCompressors.java:43) Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.getInstance()> in (StandardDeCompressors.java:44) Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.XZInputStreamFactory.getInstance()> in (StandardDeCompressors.java:46) +Static Initializer <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.BooleanSerializer.INSTANCE> in (AbstractStreamingWriter.java:74) Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:52) Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:56) Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (PartitionTimeCommitTrigger.java:56) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java index 8a326311ce0..ca156ef890e 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java @@ -18,6 +18,9 @@ package org.apache.flink.connector.file.table.stream; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.BooleanSerializer; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -33,6 +36,12 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.shaded.guava32.com.google.common.collect.Lists; + +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkState; + /** * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send * file and bucket information to downstream. @@ -58,6 +67,16 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe protected transient long currentWatermark; + /** + * Used to remember that EOI has already happened so that we don't emit the last committables of + * the final checkpoints twice. + */ + private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC = + new ListStateDescriptor<>("end_of_input_state", BooleanSerializer.INSTANCE); + + private boolean endOfInput; + private ListState<Boolean> endOfInputState; + public AbstractStreamingWriter( long bucketCheckInterval, StreamingFileSink.BucketsBuilder< @@ -123,6 +142,27 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe bucketCheckInterval); currentWatermark = Long.MIN_VALUE; + + // Figure out if we have seen end of input before and if we should anything downstream. We + // have the following + // cases: + // 1. state is empty: + // - First time initialization + // - Restoring from a previous version of Flink that didn't handle EOI + // - Upscaled from a final or regular checkpoint + // In all cases, we regularly handle EOI, potentially resulting in unnecessary . + // 2. state is not empty: + // - This implies Flink restores from a version that handles EOI. + // - If there is one entry, no rescaling happened (for this subtask), so if it's true, + // we recover from a final checkpoint (for this subtask) and can ignore another EOI + // else we have a regular checkpoint. + // - If there are multiple entries, Flink downscaled, and we need to check if all are + // true and do the same as above. As soon as one entry is false, we regularly start + // the writer and potentially emit duplicate summaries if we indeed recovered from a + // final checkpoint. + endOfInputState = context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC); + List<Boolean> previousState = Lists.newArrayList(endOfInputState.get()); + endOfInput = !previousState.isEmpty() && !previousState.contains(false); } @Override @@ -139,6 +179,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe @Override public void processElement(StreamRecord<IN> element) throws Exception { + checkState(!endOfInput, "Received element after endOfInput: %s", element); helper.onElement( element.getValue(), getProcessingTimeService().getCurrentProcessingTime(), @@ -149,15 +190,20 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - commitUpToCheckpoint(checkpointId); + if (!this.endOfInput) { + commitUpToCheckpoint(checkpointId); + } } @Override public void endInput() throws Exception { - buckets.onProcessingTime(Long.MAX_VALUE); - helper.snapshotState(Long.MAX_VALUE); - output.emitWatermark(new Watermark(Long.MAX_VALUE)); - commitUpToCheckpoint(Long.MAX_VALUE); + if (!this.endOfInput) { + this.endOfInput = true; + buckets.onProcessingTime(Long.MAX_VALUE); + helper.snapshotState(Long.MAX_VALUE); + output.emitWatermark(new Watermark(Long.MAX_VALUE)); + commitUpToCheckpoint(Long.MAX_VALUE); + } } @Override
