Nazar Volynets created FLINK-20753:
--------------------------------------
Summary: Duplicates With Exactly-once Kafka -> Kakfa Producer
Key: FLINK-20753
URL: https://issues.apache.org/jira/browse/FLINK-20753
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka, Runtime / Checkpointing
Affects Versions: 1.12.0
Environment: Java 11
Flink stated within IDE
Reporter: Nazar Volynets
*Introduction*
Based on as follows statements from Flink's docs:
1.
[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html]
{quote}Flink provides an [Apache Kafka|https://kafka.apache.org/] connector for
reading data from and writing data to Kafka topics with exactly-once guarantees.
{quote}
2.
[https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#exactly-once-end-to-end]
{quote}To achieve exactly once end-to-end, so that every event from the sources
affects the sinks exactly once, the following must be true:
# your sources must be replayable, and
# your sinks must be transactional (or idempotent){quote}
3.
[https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#caveats]
{quote}{{Semantic.EXACTLY_ONCE}} mode relies on the ability to commit
transactions that were started before taking a checkpoint, after recovering
from the said checkpoint. If the time between Flink application crash and
completed restart is larger than Kafka's transaction timeout there will be data
loss (Kafka will automatically abort transactions that exceeded timeout time)
{quote}
4. [https://issues.apache.org/jira/browse/FLINK-7210]
There is references/mentions about two-phase commit mechanic used in old Flink
Kafka connector. So it is expected that latest one version of connector has the
same functionality.
it is indirectly expectation of EXACTLY_ONCE Kafka->Kafka end-to-end delivery
guarantees.
Moreover it is emphasised to tune Kafka cluster transaction timeout (make it
from 15 mins to 1 hour) to omit data loss.
Moving forward, all these three statements are met by `Kafka Source` -> `Kafka
Sink` app:
* regarding first-one -> you are reading from & to Kafka
* about second-one -> `Kafka Source` is replayable & `Kafka Sink` is
transactional
* last one -> `Kafka Sink` is transactional & consequently in case of
EXACTLY_ONCE this operator has a state; so it expected that transaction will be
rolled back.
But in fact there is no possibility to achieve EXACTLY_ONCE for simple Flink
`Kafka Source` -> `Kafka Sink` application. Duplicates still exists as result
EXACTLY_ONCE semantics is violated.
*Details*
+STRs:+
# Create simple Flink's `Kafka Source` -> `Kafka Sink` app
## Stream execution env:
### Parallelism -> 1
### Enable checkpointing -> 10000 ms (do it so big intentionally)
### State backend -> RocksDB
### Checkpointing mode -> EXACTLY_ONCE
### Min pause between checkpoints -> 500 ms
### Max concurrent checkpoints -> 1
## Flink Kafka consumer
### Nothing valuable
## Flink Kafka producer
### Props:
#### ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"
#### ProducerConfig.ACKS_CONFIG, "all"
#### ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"
### EXACTLY_ONCE Semantic
# Deploy `Kafka Source` Cluster
## Cretae `topic-1` with 3 patitions
# Deploy `Kafka Sink` Cluster
## Cretae `topic-1` with 3 patitions
# Spin up some Kafka client to generate data into `Kafka Source`:`topic-1`
(e.g. Confluent `kafka-console-producer`)
# Spin up +transactional+ Kafka consumer to drain data from `Kafka
Sink`:`topic-1` (e.g. Confluent `kafka-console-consumer`)
# Use Flink's app described in step #1 to ship data from `Kafka Source` ->
`Kafka Sink` Kafka cluster.
# Wait until Flink app will create a first checkpoint.
# Brutally kill Flink's app (SIGKILL)
# Wait 10 secs
# Start Flink app again.
# Check on duplications in +transactional+ Kafka consumer (described in step
#5)
+Actual+
Duplication are exist in +transactional+ Kafka consumer output.
+Expected+
* Kafka transaction should be rolled back by Flink Kafka producer with
EXACTLY_ONCE Semantic
* Flink should automatically replay the data from `Kafka Source` based on
offsets persisted in latest checkpoint
*Example*
+App+
{code:java|title=build.gradle (dependencies)}
...
ext {
...
javaVersion = '11'
flinkVersion = '1.12.0'
scalaBinaryVersion = '2.11'
...
}
dependencies {
...
implementation
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
implementation
"org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
implementation
"org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
...
}
{code}
{code:java|title=App}
public static void main(String[] args) {
...
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // to make things simple
env.enableCheckpointing(10000); // intentionally specified 10 secs to have a
room to stop app between checkpoints
env.setStateBackend(new
RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
FlinkKafkaConsumer<Record> consumer = createConsumer();
FlinkKafkaProducer<Record> producer = createProducer();
env
.addSource(consumer)
.uid("kafka-consumer")
.addSink(producer)
.uid("kafka-producer")
;
env.execute();
}
public static FlinkKafkaConsumer<Record> createConsumer() {
...
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-source-1:9091");
... // nothing special
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1", new
RecordKafkaDerSchema(), props);
... // RecordKafkaDerSchema --> custom schema is used to copy not only
message body but message key too
... // SimpleStringSchema --> can be used instead to reproduce issue
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);
return consumer;
}
public static FlinkKafkaProducer<Record> createProducer() {
...
Properties props = new Properties();
...
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka-target-1:9094");
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000");
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // ignored
due to expected behaviour - https://issues.apache.org/jira/browse/FLINK-17691
props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 *
1000)); // decreased from 1 hour to 15 mins; app is going to be shutdown less
than 15 mins
...
FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1", new
RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
... // RecordKafkaSerSchema --> custom schema is used to copy not only
message body but message key too
... // SimpleStringSchema --> can be used instead to reproduce issue
return producer;
}
{code}
{code:bash|title=kafka-source-1 Producer}
bash -c 'echo Producing data... && \
for ((i=0; ;++i)); do echo "t1-k-$$i:t1-v-$$i"; sleep 2; done |
kafka-console-producer --request-required-acks 1 --broker-list
kafka-source-1:9091 --topic topic-1 --property parse.key=true --property
key.separator=":"'
{code}
{code:bash|title=kafka-target-1 Consumer - 0 partition}
bash -c 'echo Consuming data for topic-1... && \
kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic
topic-1 --partition 0 --from-beginning --property print.key=true --property
key.separator=":" --value-deserializer
org.apache.kafka.common.serialization.StringDeserializer' --isolation-level
read_committed
{code}
{code:bash|title=kafka-target-1 Consumer - 1 partition}
bash -c 'echo Consuming data for topic-1... && \
kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic
topic-1 --partition 1 --from-beginning --property print.key=true --property
key.separator=":" --value-deserializer
org.apache.kafka.common.serialization.StringDeserializer' --isolation-level
read_committed
{code}
{code:bash|title=kafka-target-1 Consumer - 2 partition}
bash -c 'echo Consuming data for topic-1... && \
kafka-console-consumer --bootstrap-server kafka-target-1:9094 --topic
topic-1 --partition 2 --from-beginning --property print.key=true --property
key.separator=":" --value-deserializer
org.apache.kafka.common.serialization.StringDeserializer' --isolation-level
read_committed
{code}
+Output+
{code:java|title=kafka-target-1 Consumer - 0 partition}
...
t1-k-40:t1-v-40
t1-k-43:t1-v-43
t1-k-44:t1-v-44
t1-k-47:t1-v-47
t1-k-48:t1-v-48
t1-k-49:t1-v-49
t1-k-48:t1-v-48 // DUPLICATION!!! --> EXACTLY ONCE is violated
t1-k-49:t1-v-49 // DUPLICATION!!! --> EXACTLY ONCE is violated
t1-k-54:t1-v-54
t1-k-61:t1-v-61
t1-k-62:t1-v-62
t1-k-66:t1-v-66
t1-k-71:t1-v-71
t1-k-73:t1-v-73
...
{code}
{code:java|title=kafka-target-1 Consumer - 1 partition}
...
t1-k-35:t1-v-35
t1-k-46:t1-v-46
t1-k-50:t1-v-50
t1-k-51:t1-v-51
t1-k-53:t1-v-53
t1-k-56:t1-v-56
t1-k-57:t1-v-57
t1-k-59:t1-v-59
t1-k-60:t1-v-60
t1-k-63:t1-v-63
t1-k-65:t1-v-65
t1-k-69:t1-v-69
t1-k-74:t1-v-74
...
{code}
{code:java|title=kafka-target-1 Consumer - 2 partition}
...
t1-k-39:t1-v-39
t1-k-41:t1-v-41
t1-k-42:t1-v-42
t1-k-45:t1-v-45
t1-k-52:t1-v-52
t1-k-55:t1-v-55
t1-k-58:t1-v-58
t1-k-64:t1-v-64
t1-k-67:t1-v-67
t1-k-68:t1-v-68
t1-k-70:t1-v-70
t1-k-72:t1-v-72
t1-k-75:t1-v-75
t1-k-77:t1-v-77
...
{code}
+Summary+
As we can see from `kafka-target-1 Consumer - 0 partition` EXACTLY ONCE
delivery has been violated.
P.S.: If I have missed something. Please let me know what & how achieve EXACTLY
ONCE delivery in native way (via Flink configuration) for this particular
simple application described above.
P.S.: If it is not possible to do in native way (only manual/custom
implementation) then please let me know.
P.S.: Similar issue discussions:
*
[https://stackoverflow.com/questions/57308590/exactly-once-semantics-in-flink-kafka-producer]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)