Calico Shop created FLINK-39192:
-----------------------------------

             Summary: SinkWriterOperator in post-commit topology throws 
IllegalStateException with bounded source in streaming mode
                 Key: FLINK-39192
                 URL: https://issues.apache.org/jira/browse/FLINK-39192
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Task
    Affects Versions: 2.2.0, 2.1.0
            Reporter: Calico Shop


When a *Sink* implements *SupportsPostCommitTopology* and the job uses a 
*bounded* source in streaming mode ({*}RuntimeExecutionMode.STREAMING{*}), the 
*SinkWriterOperator* wrapping the post-commit sink throws:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput{code}
 
h3. Root cause

With a bounded source in streaming mode, the following sequence occurs:

1. Source emits all records and finishes
2. *endOfInput* propagates through the operator chain; the post-commit 
{*}SinkWriterOperator{*}'s *endInput()* sets *endOfInput = true*
3. A final checkpoint is triggered
4. During checkpoint completion, the upstream *CommitterOperator* emits 
*CommittableMessage* instances downstream into the post-commit topology
5. The post-commit *SinkWriterOperator.processElement()* rejects these elements 
because *endOfInput* is already *true*

The *endOfInput* guard assumes no elements arrive after the flag is set, but 
*CommitterOperator* legitimately emits committables during the final 
checkpoint's completion callback, which happens _after_ *endOfInput* has 
propagated.

This only affects the *SinkWriterOperator* instance _inside the post-commit 
topology_ (the one wrapping the post-commit sink). The primary 
*SinkWriterOperator* (the one receiving user data) is not affected.
h3. How to reproduce

The following self-contained program reproduces the bug. It uses a 
*DataGeneratorSource* that emits 100 integers into a sink with a post-commit 
topology that simply discards the committables:
{code:java}
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import 
org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;

public class PostCommitBoundedSourceBug {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        // Streaming mode is the default, but being explicit:
        
env.setRuntimeMode(org.apache.flink.api.common.RuntimeExecutionMode.STREAMING);
        env.enableCheckpointing(1000);

        // Bounded source: emits 100 integers, then finishes
        DataGeneratorSource<Integer> source =
                new DataGeneratorSource<>(index -> index.intValue(), 100, 
Types.INT);

        env
                .fromSource(
                        source,
                        
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),
                        "bounded-source")
                .sinkTo(new PostCommitSink());

        // This will throw:
        // java.lang.IllegalStateException: Received element after endOfInput
        env.execute("post-commit-bounded-source-bug");
    }

    /**
     * Minimal sink with a post-commit topology.
     * The writer produces an Integer committable per element.
     * The committer is a no-op. The post-commit topology discards everything.
     */
    static class PostCommitSink
            implements Sink<Integer>, SupportsCommitter<Integer>, 
SupportsPostCommitTopology<Integer> {

        @Override
        public CommittingSinkWriter<Integer, Integer> 
createWriter(WriterInitContext context) {
            return new CommittingSinkWriter<>() {
                private int pending = 0;

                @Override
                public void write(Integer element, Context context) {
                    pending++;
                }

                @Override
                public void flush(boolean endOfInput) {
                }

                @Override
                public Collection<Integer> prepareCommit() {
                    int count = pending;
                    pending = 0;
                    return count > 0 ? Collections.singletonList(count) : 
Collections.emptyList();
                }

                @Override
                public void close() {
                }
            };
        }

        @Override
        public Committer<Integer> createCommitter(CommitterInitContext context) 
{
            return new Committer<Integer>() {
                @Override
                public void commit(Collection<CommitRequest<Integer>> 
committables)
                        throws IOException, InterruptedException {
                }

                @Override
                public void close() throws Exception {
                }
            };
        }

        @Override
        public SimpleVersionedSerializer<Integer> getCommittableSerializer() {
            return new SimpleVersionedSerializer<>() {
                @Override
                public int getVersion() {
                    return 0;
                }

                @Override
                public byte[] serialize(Integer value) {
                    return ByteBuffer.allocate(4).putInt(value).array();
                }

                @Override
                public Integer deserialize(int version, byte[] serialized) {
                    return ByteBuffer.wrap(serialized).getInt();
                }
            };
        }

        @Override
        public void 
addPostCommitTopology(DataStream<CommittableMessage<Integer>> committables) {
            // Any .sinkTo() here creates a SinkWriterOperator that will fail
            committables.sinkTo(new DiscardingSink<>());
        }
    }
}
{code}
h3. Expected behavior

The post-commit topology should successfully process committables emitted 
during the final checkpoint. The job should complete with status {*}FINISHED{*}.
h3. Actual behavior

The job fails with:
{code:java}
java.lang.IllegalStateException: Received element after endOfInput
    at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:182)
{code}
h3. Notes

This bug does _not_ manifest when:
 - Running in *RuntimeExecutionMode.STREAMING* using an unbounded source
 -- (no *endOfInput* is ever called)
 - Running in *RuntimeExecutionMode.BATCH*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to