AHeise commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r698546414
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
closer.close();
}
- private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState>
recoveredStates) {
- final int subtaskId = kafkaSinkContext.getParallelInstanceId();
- if (recoveredStates.isEmpty()) {
- final KafkaWriterState state =
- new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
- abortTransactions(getTransactionsToAbort(state, new
ArrayList<>()));
- return state;
+ private void abortLingeringTransactions(
+ List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+ List<String> prefixesToAbort =
Lists.newArrayList(transactionalIdPrefix);
+
+ if (!recoveredStates.isEmpty()) {
+ KafkaWriterState lastState = recoveredStates.get(0);
+ if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+ prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+ LOG.warn(
+ "Transactional id prefix from previous execution {}
has changed to {}.",
+ lastState.getTransactionalIdPrefix(),
+ transactionalIdPrefix);
+ }
}
- final Map<Integer, KafkaWriterState> taskOffsetMapping =
- recoveredStates.stream()
- .collect(
- Collectors.toMap(
- KafkaWriterState::getSubtaskId,
Function.identity()));
- checkState(
- taskOffsetMapping.containsKey(subtaskId),
- "Internal error: It is expected that state from previous
executions is distributed to the same subtask id.");
- final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
- taskOffsetMapping.remove(subtaskId);
- abortTransactions(
- getTransactionsToAbort(lastState, new
ArrayList<>(taskOffsetMapping.values())));
- if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
- LOG.warn(
- "Transactional id prefix from previous execution {} has
changed to {}.",
- lastState.getTransactionalIdPrefix(),
- transactionalIdPrefix);
- return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+ final Properties properties = new Properties();
+ properties.putAll(kafkaProducerConfig);
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");
+ try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+ new FlinkKafkaInternalProducer<>(properties)) {
+ for (String prefix : prefixesToAbort) {
+ abortTransactionsWithPrefix(producer, prefix,
startCheckpointId);
+ }
}
- return new KafkaWriterState(
- transactionalIdPrefix, subtaskId,
lastState.getTransactionalIdOffset());
}
- private void abortTransactions(List<String> transactionsToAbort) {
- transactionsToAbort.forEach(
- transaction -> {
- // don't mess with the original configuration or any other
- // properties of the
- // original object
- // -> create an internal kafka producer on our own and do
not rely
- // on
- // initTransactionalProducer().
- final Properties myConfig = new Properties();
- myConfig.putAll(kafkaProducerConfig);
- myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transaction);
- LOG.info("Aborting Kafka transaction {}.", transaction);
- FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
null;
- try {
- kafkaProducer = new
FlinkKafkaInternalProducer<>(myConfig);
- // it suffices to call initTransactions - this will
abort any
- // lingering transactions
- kafkaProducer.initTransactions();
- } finally {
- if (kafkaProducer != null) {
- kafkaProducer.close(Duration.ofSeconds(0));
- }
- }
- });
+ /**
+ * Aborts all transactions that have been created by this subtask in a
previous run.
+ *
+ * <p>It also aborts transactions from subtasks that may have been removed
because of
+ * downscaling.
+ *
+ * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is
responsible for cleaning
+ * all subtasks j in [0; X), where j % Y = i. For example, if we downscale
to 2, then subtask 0
+ * is responsible for all even and subtask 1 for all odd subtasks.
+ */
+ private void abortTransactionsWithPrefix(
Review comment:
But this is exactly the same thing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]