AHeise commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r698557571
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -327,41 +346,43 @@ private void flushRecords(boolean finalFlush) {
* For each checkpoint we create new {@link FlinkKafkaInternalProducer} so
that new transactions
* will not clash with transactions created during previous checkpoints
({@code
* producer.initTransactions()} assures that we obtain new producerId and
epoch counters).
+ *
+ * <p>Ensures that all transaction ids in between lastCheckpointId and
checkpointId are
+ * initialized.
*/
- private FlinkKafkaInternalProducer<byte[], byte[]>
createTransactionalProducer() {
- final long transactionalIdOffset =
kafkaWriterState.getTransactionalIdOffset() + 1;
+ private FlinkKafkaInternalProducer<byte[], byte[]>
createTransactionalProducer(
+ long checkpointId) {
+ checkState(
+ checkpointId > lastCheckpointId,
+ "Expected %s > %s",
+ checkpointId,
+ lastCheckpointId);
final Properties copiedProducerConfig = new Properties();
copiedProducerConfig.putAll(kafkaProducerConfig);
- initTransactionalProducerConfig(
- copiedProducerConfig,
- transactionalIdOffset,
- transactionalIdPrefix,
- kafkaSinkContext.getParallelInstanceId());
+ copiedProducerConfig.put(
+ ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+ TransactionalIdFactory.buildTransactionalId(
+ transactionalIdPrefix,
+ kafkaSinkContext.getParallelInstanceId(),
+ lastCheckpointId + 1));
final FlinkKafkaInternalProducer<byte[], byte[]> producer =
new FlinkKafkaInternalProducer<>(copiedProducerConfig);
producer.initTransactions();
- kafkaWriterState =
- new KafkaWriterState(
- transactionalIdPrefix,
- kafkaSinkContext.getParallelInstanceId(),
- transactionalIdOffset);
- LOG.info(
- "Created new transactional producer {}",
-
copiedProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+ // in case checkpoints have been aborted, Flink would create
non-consecutive transaction ids
+ // this loop ensures that all gaps are filled with initialized (empty)
transactions
+ for (long id = lastCheckpointId + 2; id <= checkpointId; id++) {
+ producer.setTransactionalId(
+ TransactionalIdFactory.buildTransactionalId(
+ transactionalIdPrefix,
+ kafkaSinkContext.getParallelInstanceId(),
+ lastCheckpointId + 1));
+ producer.initTransactions();
+ }
Review comment:
I refactored the code to make it more obvious. I'm also adding an
example in the comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]