[
https://issues.apache.org/jira/browse/FLINK-20753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254083#comment-17254083
]
Nazar Volynets commented on FLINK-20753:
----------------------------------------
Closed issue by myself with `Cannot Reproduce` resolution :)
> Duplicates With Exactly-once Kafka -> Kakfa Producer
> ----------------------------------------------------
>
> Key: FLINK-20753
> URL: https://issues.apache.org/jira/browse/FLINK-20753
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Runtime / Checkpointing
> Affects Versions: 1.12.0
> Environment: Java 11
> Flink stated within IDE
> Reporter: Nazar Volynets
> Priority: Major
>
> *Introduction*
> Based on as follows statements from Flink's docs:
> 1.
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html]
> {quote}Flink provides an [Apache Kafka|https://kafka.apache.org/] connector
> for reading data from and writing data to Kafka topics with exactly-once
> guarantees.
> {quote}
> 2.
> [https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#exactly-once-end-to-end]
> {quote}To achieve exactly once end-to-end, so that every event from the
> sources affects the sinks exactly once, the following must be true:
> # your sources must be replayable, and
> # your sinks must be transactional (or idempotent){quote}
> 3.
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#caveats]
> {quote}{{Semantic.EXACTLY_ONCE}} mode relies on the ability to commit
> transactions that were started before taking a checkpoint, after recovering
> from the said checkpoint. If the time between Flink application crash and
> completed restart is larger than Kafka's transaction timeout there will be
> data loss (Kafka will automatically abort transactions that exceeded timeout
> time)
> {quote}
> 4. [https://issues.apache.org/jira/browse/FLINK-7210]
> There is references/mentions about two-phase commit mechanic used in old
> Flink Kafka connector. So it is expected that latest one version of connector
> has the same functionality.
> it is indirectly expectation of EXACTLY_ONCE Kafka->Kafka end-to-end delivery
> guarantees.
> Moreover it is emphasised to tune Kafka cluster transaction timeout (make it
> from 15 mins to 1 hour) to omit data loss.
> Moving forward, all these three statements are met by `Kafka Source` ->
> `Kafka Sink` app:
> * regarding first-one -> you are reading from & to Kafka
> * about second-one -> `Kafka Source` is replayable & `Kafka Sink` is
> transactional
> * last one -> `Kafka Sink` is transactional & consequently in case of
> EXACTLY_ONCE this operator has a state; so it expected that transaction will
> be rolled back.
> But in fact there is no possibility to achieve EXACTLY_ONCE for simple Flink
> `Kafka Source` -> `Kafka Sink` application. Duplicates still exists as result
> EXACTLY_ONCE semantics is violated.
> *Details*
> +STRs:+
> # Create simple Flink's `Kafka Source` -> `Kafka Sink` app
> ## Stream execution env:
> ### Parallelism -> 1
> ### Enable checkpointing -> 10000 ms (do it so big intentionally)
> ### State backend -> RocksDB
> ### Checkpointing mode -> EXACTLY_ONCE
> ### Min pause between checkpoints -> 500 ms
> ### Max concurrent checkpoints -> 1
> ## Flink Kafka consumer
> ### Nothing valuable
> ## Flink Kafka producer
> ### Props:
> #### ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"
> #### ProducerConfig.ACKS_CONFIG, "all"
> #### ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"
> ### EXACTLY_ONCE Semantic
> # Deploy `Kafka Source` Cluster
> ## Cretae `topic-1` with 3 patitions
> # Deploy `Kafka Sink` Cluster
> ## Cretae `topic-1` with 3 patitions
> # Spin up some Kafka client to generate data into `Kafka Source`:`topic-1`
> (e.g. Confluent `kafka-console-producer`)
> # Spin up +transactional+ Kafka consumer to drain data from `Kafka
> Sink`:`topic-1` (e.g. Confluent `kafka-console-consumer`)
> # Use Flink's app described in step #1 to ship data from `Kafka Source` ->
> `Kafka Sink` Kafka cluster.
> # Wait until Flink app will create a first checkpoint.
> # Brutally kill Flink's app (SIGKILL)
> # Wait 10 secs
> # Start Flink app again.
> # Check on duplications in +transactional+ Kafka consumer (described in step
> #5)
> +Actual+
> Duplication are exist in +transactional+ Kafka consumer output.
> +Expected+
> * Kafka transaction should be rolled back by Flink Kafka producer with
> EXACTLY_ONCE Semantic
> * Flink should automatically replay the data from `Kafka Source` based on
> offsets persisted in latest checkpoint
> *Example*
> +App+
> {code:java|title=build.gradle (dependencies)}
> ...
> ext {
> ...
> javaVersion = '11'
> flinkVersion = '1.12.0'
> scalaBinaryVersion = '2.11'
> ...
> }
> dependencies {
> ...
> implementation
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
> implementation
> "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
> implementation
> "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
> ...
> }
> {code}
> {code:java|title=App}
> public static void main(String[] args) {
> ...
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1); // to make things simple
> env.enableCheckpointing(10000); // intentionally specified 10 secs to have
> a room to stop app between checkpoints
> env.setStateBackend(new
> RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
> FlinkKafkaConsumer<Record> consumer = createConsumer();
> FlinkKafkaProducer<Record> producer = createProducer();
> env
> .addSource(consumer)
> .uid("kafka-consumer")
> .addSink(producer)
> .uid("kafka-producer")
> ;
> env.execute();
> }
> public static FlinkKafkaConsumer<Record> createConsumer() {
> ...
> Properties props = new Properties();
> props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "kafka-source-1:9091");
> ... // nothing special
> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
> FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1",
> new RecordKafkaDerSchema(), props);
> ... // RecordKafkaDerSchema --> custom schema is used to copy not only
> message body but message key too
> ... // SimpleStringSchema --> can be used instead to reproduce issue
> consumer.setStartFromGroupOffsets();
> consumer.setCommitOffsetsOnCheckpoints(true);
> return consumer;
> }
> public static FlinkKafkaProducer<Record> createProducer() {
> ...
> Properties props = new Properties();
> ...
> props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "kafka-target-1:9094");
> props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> "1");
> props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
> props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
> props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
> props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
> props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000");
> props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); //
> ignored due to expected behaviour -
> https://issues.apache.org/jira/browse/FLINK-17691
> props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60
> * 1000)); // decreased from 1 hour to 15 mins; app is going to be shutdown
> less than 15 mins
> ...
> FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1",
> new RecordKafkaSerSchema(true), props,
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
> ... // RecordKafkaSerSchema --> custom schema is used to copy not only
> message body but message key too
> ... // SimpleStringSchema --> can be used instead to reproduce issue
> return producer;
> }
> {code}
> {code:bash|title=kafka-source-1 Producer}
> bash -c 'echo Producing data... && \
> for ((i=0; ;++i)); do echo "t1-k-$$i:t1-v-$$i"; sleep 2; done |
> kafka-console-producer --request-required-acks 1 --broker-list
> kafka-source-1:9091 --topic topic-1 --property parse.key=true --property
> key.separator=":"'
> {code}
> {code:bash|title=kafka-target-1 Consumer - 0 partition}
> bash -c 'echo Consuming data for topic-1... && \
> kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic
> topic-1 --partition 0 --from-beginning --property print.key=true --property
> key.separator=":" --value-deserializer
> org.apache.kafka.common.serialization.StringDeserializer' --isolation-level
> read_committed
> {code}
> {code:bash|title=kafka-target-1 Consumer - 1 partition}
> bash -c 'echo Consuming data for topic-1... && \
> kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic
> topic-1 --partition 1 --from-beginning --property print.key=true --property
> key.separator=":" --value-deserializer
> org.apache.kafka.common.serialization.StringDeserializer' --isolation-level
> read_committed
> {code}
> {code:bash|title=kafka-target-1 Consumer - 2 partition}
> bash -c 'echo Consuming data for topic-1... && \
> kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic
> topic-1 --partition 2 --from-beginning --property print.key=true --property
> key.separator=":" --value-deserializer
> org.apache.kafka.common.serialization.StringDeserializer' --isolation-level
> read_committed
> {code}
> +Output+
> {code:java|title=kafka-target-1 Consumer - 0 partition}
> ...
> t1-k-40:t1-v-40
> t1-k-43:t1-v-43
> t1-k-44:t1-v-44
> t1-k-47:t1-v-47
> t1-k-48:t1-v-48
> t1-k-49:t1-v-49
> t1-k-48:t1-v-48 // DUPLICATION!!! --> EXACTLY ONCE is violated
> t1-k-49:t1-v-49 // DUPLICATION!!! --> EXACTLY ONCE is violated
> t1-k-54:t1-v-54
> t1-k-61:t1-v-61
> t1-k-62:t1-v-62
> t1-k-66:t1-v-66
> t1-k-71:t1-v-71
> t1-k-73:t1-v-73
> ...
> {code}
> {code:java|title=kafka-target-1 Consumer - 1 partition}
> ...
> t1-k-35:t1-v-35
> t1-k-46:t1-v-46
> t1-k-50:t1-v-50
> t1-k-51:t1-v-51
> t1-k-53:t1-v-53
> t1-k-56:t1-v-56
> t1-k-57:t1-v-57
> t1-k-59:t1-v-59
> t1-k-60:t1-v-60
> t1-k-63:t1-v-63
> t1-k-65:t1-v-65
> t1-k-69:t1-v-69
> t1-k-74:t1-v-74
> ...
> {code}
> {code:java|title=kafka-target-1 Consumer - 2 partition}
> ...
> t1-k-39:t1-v-39
> t1-k-41:t1-v-41
> t1-k-42:t1-v-42
> t1-k-45:t1-v-45
> t1-k-52:t1-v-52
> t1-k-55:t1-v-55
> t1-k-58:t1-v-58
> t1-k-64:t1-v-64
> t1-k-67:t1-v-67
> t1-k-68:t1-v-68
> t1-k-70:t1-v-70
> t1-k-72:t1-v-72
> t1-k-75:t1-v-75
> t1-k-77:t1-v-77
> ...
> {code}
> +Summary+
> As we can see from `kafka-target-1 Consumer - 0 partition` EXACTLY ONCE
> delivery has been violated.
> P.S.: If I have missed something. Please let me know what & how achieve
> EXACTLY ONCE delivery in native way (via Flink configuration) for this
> particular simple application described above.
> P.S.: If it is not possible to do in native way (only manual/custom
> implementation) then please let me know.
> P.S.: Similar issue discussions:
> *
> [https://stackoverflow.com/questions/57308590/exactly-once-semantics-in-flink-kafka-producer]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)