Hi all,

I have implement Structured Streaming Kafka sink EOS with Kafka transaction
producer, and design sketch is in <a href="
https://issues.apache.org/jira/browse/SPARK-28908";>SPARK-28908</a> and pr
in <a href="https://github.com/apache/spark/pull/25618";>25618</a>. But now
I meet a problem as blow.

When producer failed to commit transaction after successfully send data to
Kafka for some reason, such as kafka broker down, spark job will fail down.
After kafka broker recovered, restart the job and transaction will resume.
But if the time between transaction commit failure fixed and job restart by
job attempt or manually exceed the config `transaction.timeout.ms`, data
send by producer will be discard by kafka broker, leading to data loss.

My solution is
1.Increase the config `transaction.timeout.ms`.
 Set the config from 60 seconds, the default value of `
transaction.timeout.ms` in producer, to 15 minutes, the default value of
config `transaction.max.timeout.ms` in Kafka broker if user not defined.
Because the request will fail with a InvalidTransactionTimeout error if `
transaction.timeout.ms` is larger than `transaction.max.timeout.ms`. And if
user defined transaction.timeout.ms`, we just check if it is larger enough.
2.Notice user the config `transaction.timeout.ms` in document, and
introduce some solution to avoid data loss, such as increase config `
transaction.timeout.ms` and `transaction.max.timeout.ms`, and avoid exceed
the time.

BTW, I just skimmed the code in Flink, and found by default flink set the `
transaction.timeout.ms` property in producer config to 1 hour, and notice
user to increase `transaction.max.timeout.ms` in doc.

Any idea about how to handle this problem?

Many Thanks,
Wenxuan

Reply via email to