This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6d60f41a9b1369cd9af6fe0e0fb384f39b3166ff
Author: Arvid Heise <[email protected]>
AuthorDate: Sun Sep 15 22:30:50 2024 +0200

    [FLINK-25920] Fix AbstractStreamingWriter sending after EOI
    
    AbstractStreamingWriter send partition info twice on EOI. This commit 
ensures that we are not resending partition information even after restarting 
from a final checkpoint.
---
 .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e           | 36 +++++++-------
 .../file/table/stream/AbstractStreamingWriter.java | 56 ++++++++++++++++++++--
 2 files changed, 70 insertions(+), 22 deletions(-)

diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
index 91d8cab4e67..b9d189c8fe6 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
@@ -560,29 +560,30 @@ Method 
<org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.open
 Method 
<org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(BatchFileWriter.java:116)
 Method 
<org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has generic parameter type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<T>> with type 
argument depending on 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(BatchFileWriter.java:0)
 Method 
<org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has parameter of type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(BatchFileWriter.java:0)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> 
in (AbstractStreamingWriter.java:106)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> 
in (AbstractStreamingWriter.java:125)
 Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)>
 has generic parameter type 
<org.apache.flink.streaming.api.functions.sink.filesystem.Bucket<IN, 
java.lang.String>> with type argument depending on 
<org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in 
(AbstractStreamingWriter.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)>
 has parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in 
(AbstractStreamingWriter.java:0)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> 
in (AbstractStreamingWriter.java:111)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> 
in (AbstractStreamingWriter.java:130)
 Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)>
 has generic parameter type 
<org.apache.flink.streaming.api.functions.sink.filesystem.Bucket<IN, 
java.lang.String>> with type argument depending on 
<org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in 
(AbstractStreamingWriter.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)>
 has parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in 
(AbstractStreamingWriter.java:0)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.close()> 
calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.close()>
 in (AbstractStreamingWriter.java:167)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.commitUpToCheckpoint(long)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.commitUpToCheckpoint(long)>
 in (AbstractStreamingWriter.java:90)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(long)>
 in (AbstractStreamingWriter.java:157)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)>
 in (AbstractStreamingWriter.java:158)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.Buckets,
 boolean, org.apache.flink.api.common.state.OperatorStateStore, 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in 
(AbstractStreamingWriter.java:122)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setBucketLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener)>
 in (AbstractStreamingWriter.java:101)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setFileLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener)>
 in (AbstractStreamingWriter.java:115)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getProcessingTimeService()>
 in (AbstractStreamingWriter.java:122)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (AbstractStreamingWriter.java:98)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(java.lang.Object,
 long, java.lang.Long, long)> in (AbstractStreamingWriter.java:142)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getTimestamp()> 
in (AbstractStreamingWriter.java:145)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(AbstractStreamingWriter.java:143)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.hasTimestamp()> 
in (AbstractStreamingWriter.java:145)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()>
 in (AbstractStreamingWriter.java:144)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.close()> 
calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.close()>
 in (AbstractStreamingWriter.java:213)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.commitUpToCheckpoint(long)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.commitUpToCheckpoint(long)>
 in (AbstractStreamingWriter.java:109)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(long)>
 in (AbstractStreamingWriter.java:202)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)>
 in (AbstractStreamingWriter.java:203)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.Buckets,
 boolean, org.apache.flink.api.common.state.OperatorStateStore, 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in 
(AbstractStreamingWriter.java:141)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls method 
<org.apache.flink.shaded.guava32.com.google.common.collect.Lists.newArrayList(java.lang.Iterable)>
 in (AbstractStreamingWriter.java:164)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setBucketLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener)>
 in (AbstractStreamingWriter.java:120)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setFileLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener)>
 in (AbstractStreamingWriter.java:134)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getProcessingTimeService()>
 in (AbstractStreamingWriter.java:141)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (AbstractStreamingWriter.java:117)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(java.lang.Object,
 long, java.lang.Long, long)> in (AbstractStreamingWriter.java:183)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getTimestamp()> 
in (AbstractStreamingWriter.java:186)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(AbstractStreamingWriter.java:184)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.hasTimestamp()> 
in (AbstractStreamingWriter.java:186)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()>
 in (AbstractStreamingWriter.java:185)
 Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has generic parameter type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IN>> with type 
argument depending on 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(AbstractStreamingWriter.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has parameter of type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(AbstractStreamingWriter.java:0)
-Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(org.apache.flink.runtime.state.StateSnapshotContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)>
 in (AbstractStreamingWriter.java:131)
+Method 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(org.apache.flink.runtime.state.StateSnapshotContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)>
 in (AbstractStreamingWriter.java:171)
 Method 
<org.apache.flink.connector.file.table.stream.PartitionCommitTrigger.create(boolean,
 org.apache.flink.api.common.state.OperatorStateStore, 
org.apache.flink.configuration.Configuration, java.lang.ClassLoader, 
java.util.List, 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService)> has parameter 
of type <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService> in 
(PartitionCommitTrigger.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.PartitionCommitter.commitPartitions(long)>
 calls method 
<org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)>
 in (PartitionCommitter.java:167)
 Method 
<org.apache.flink.connector.file.table.stream.PartitionCommitter.commitPartitions(long)>
 calls method 
<org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)>
 in (PartitionCommitter.java:172)
@@ -647,6 +648,7 @@ Static Initializer 
<org.apache.flink.connector.file.src.compression.StandardDeCo
 Static Initializer 
<org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()>
 calls method 
<org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory.getInstance()>
 in (StandardDeCompressors.java:43)
 Static Initializer 
<org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()>
 calls method 
<org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.getInstance()>
 in (StandardDeCompressors.java:44)
 Static Initializer 
<org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()>
 calls method 
<org.apache.flink.api.common.io.compression.XZInputStreamFactory.getInstance()> 
in (StandardDeCompressors.java:46)
+Static Initializer 
<org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.<clinit>()>
 gets field 
<org.apache.flink.api.common.typeutils.base.BooleanSerializer.INSTANCE> in 
(AbstractStreamingWriter.java:74)
 Static Initializer 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)>
 in (PartitionTimeCommitTrigger.java:52)
 Static Initializer 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(PartitionTimeCommitTrigger.java:56)
 Static Initializer 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()>
 gets field 
<org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in 
(PartitionTimeCommitTrigger.java:56)
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java
index 8a326311ce0..ca156ef890e 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/AbstractStreamingWriter.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.connector.file.table.stream;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -33,6 +36,12 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Operator for file system sink. It is a operator version of {@link 
StreamingFileSink}. It can send
  * file and bucket information to downstream.
@@ -58,6 +67,16 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
 
     protected transient long currentWatermark;
 
+    /**
+     * Used to remember that EOI has already happened so that we don't emit 
the last committables of
+     * the final checkpoints twice.
+     */
+    private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC =
+            new ListStateDescriptor<>("end_of_input_state", 
BooleanSerializer.INSTANCE);
+
+    private boolean endOfInput;
+    private ListState<Boolean> endOfInputState;
+
     public AbstractStreamingWriter(
             long bucketCheckInterval,
             StreamingFileSink.BucketsBuilder<
@@ -123,6 +142,27 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
                         bucketCheckInterval);
 
         currentWatermark = Long.MIN_VALUE;
+
+        // Figure out if we have seen end of input before and if we should 
anything downstream. We
+        // have the following
+        // cases:
+        // 1. state is empty:
+        //   - First time initialization
+        //   - Restoring from a previous version of Flink that didn't handle 
EOI
+        //   - Upscaled from a final or regular checkpoint
+        // In all cases, we regularly handle EOI, potentially resulting in 
unnecessary .
+        // 2. state is not empty:
+        //   - This implies Flink restores from a version that handles EOI.
+        //   - If there is one entry, no rescaling happened (for this 
subtask), so if it's true,
+        //     we recover from a final checkpoint (for this subtask) and can 
ignore another EOI
+        //     else we have a regular checkpoint.
+        //   - If there are multiple entries, Flink downscaled, and we need to 
check if all are
+        //     true and do the same as above. As soon as one entry is false, 
we regularly start
+        //     the writer and potentially emit duplicate summaries if we 
indeed recovered from a
+        //     final checkpoint.
+        endOfInputState = 
context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
+        List<Boolean> previousState = 
Lists.newArrayList(endOfInputState.get());
+        endOfInput = !previousState.isEmpty() && 
!previousState.contains(false);
     }
 
     @Override
@@ -139,6 +179,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
 
     @Override
     public void processElement(StreamRecord<IN> element) throws Exception {
+        checkState(!endOfInput, "Received element after endOfInput: %s", 
element);
         helper.onElement(
                 element.getValue(),
                 getProcessingTimeService().getCurrentProcessingTime(),
@@ -149,15 +190,20 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        commitUpToCheckpoint(checkpointId);
+        if (!this.endOfInput) {
+            commitUpToCheckpoint(checkpointId);
+        }
     }
 
     @Override
     public void endInput() throws Exception {
-        buckets.onProcessingTime(Long.MAX_VALUE);
-        helper.snapshotState(Long.MAX_VALUE);
-        output.emitWatermark(new Watermark(Long.MAX_VALUE));
-        commitUpToCheckpoint(Long.MAX_VALUE);
+        if (!this.endOfInput) {
+            this.endOfInput = true;
+            buckets.onProcessingTime(Long.MAX_VALUE);
+            helper.snapshotState(Long.MAX_VALUE);
+            output.emitWatermark(new Watermark(Long.MAX_VALUE));
+            commitUpToCheckpoint(Long.MAX_VALUE);
+        }
     }
 
     @Override

Reply via email to