fapaul commented on code in PR #154:
URL:
https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2024533233
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -293,22 +309,38 @@ private TransactionAbortStrategyContextImpl
getTransactionAbortStrategyContext(
producerPool.recycle(producer);
return epoch;
};
- Set<String> ongoingTransactionIds =
+ Set<String> precommittedTransactionalIds =
recoveredStates.stream()
.flatMap(
s ->
- s.getOngoingTransactions().stream()
+
s.getPrecommittedTransactionalIds().stream()
.map(CheckpointTransaction::getTransactionalId))
.collect(Collectors.toSet());
+ Set<Integer> oldSubtaskIds =
+ recoveredStates.stream()
+ .map(KafkaWriterState::getSubtaskId)
+ .collect(Collectors.toSet());
+ Integer oldParallelism =
+ recoveredStates.stream()
+ .map(KafkaWriterState::getParallelism)
+ .findFirst()
+ .orElse(null);
+ // Special case: recovery without checkpoint -> first subtask owns
everything
+ if (recoveredStates.isEmpty() &&
kafkaSinkContext.getParallelInstanceId() == 0) {
Review Comment:
I might have missed something but I understood the current code as follows
for this special case
1. oldSubtaskIds = empty, oldParallelism =1
2. call `abortTransactions(Context context)`
3. Get all open transactions for prefix
https://github.com/apache/flink-connector-kafka/blob/85942c398c253471d602116bddd55c33ff17db8f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java#L132
4. Filter by "ownership"
https://github.com/apache/flink-connector-kafka/blob/85942c398c253471d602116bddd55c33ff17db8f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java#L139
5. For all open transactions for subtask we abort
https://github.com/apache/flink-connector-kafka/blob/85942c398c253471d602116bddd55c33ff17db8f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java#L156
Isn't Step 4 returning always false and we don't abort anything because of
https://github.com/apache/flink-connector-kafka/blob/85942c398c253471d602116bddd55c33ff17db8f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java#L209
being empty due to no state
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java:
##########
@@ -104,4 +127,13 @@ public long getStartCheckpointId() {
public TransactionAborter getTransactionAborter() {
return transactionAborter;
}
+
+ @Override
+ public void markTransactionalIdUnsafe(String transactionalId) {
+ this.unsafeTransactionalIds.add(transactionalId);
+ }
+
+ public Collection<String> getUnsafeTransactionalIds() {
Review Comment:
Why is this method not part of the interface?
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -277,6 +286,13 @@ private void abortLingeringTransactions(
TransactionAbortStrategyContextImpl context =
getTransactionAbortStrategyContext(startCheckpointId,
prefixesToAbort);
transactionAbortStrategy.abortTransactions(context);
+ Collection<String> unsafeTransactionalIds =
context.getUnsafeTransactionalIds();
+ if (!unsafeTransactionalIds.isEmpty()) {
+ namingContext.setUnsafeTransactionalIds(unsafeTransactionalIds);
+ LOG.info(
+ "Unsafe transactional ids {} detected. They will be
skipped until first checkpoint.",
Review Comment:
I do not fully understand the comment.
I guess this relys on the fact that the committer during recovery tries to
commit all outstanding transactions so we do not need to wait for
`notifyCheckpointComplete`? My concern is that transactions are only committed
on `notifyCheckpointComplete` that might not be aligned with the general
checkpointing cycle.
I also find `skipped` without context hard to understand. Maybe something
like `reused` would be easier.
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -293,22 +309,38 @@ private TransactionAbortStrategyContextImpl
getTransactionAbortStrategyContext(
producerPool.recycle(producer);
return epoch;
};
- Set<String> ongoingTransactionIds =
+ Set<String> precommittedTransactionalIds =
recoveredStates.stream()
.flatMap(
s ->
- s.getOngoingTransactions().stream()
+
s.getPrecommittedTransactionalIds().stream()
.map(CheckpointTransaction::getTransactionalId))
.collect(Collectors.toSet());
+ Set<Integer> oldSubtaskIds =
Review Comment:
Nit: I wonder whether the `getTransactionAbortStrategyContext` method isn't
better placed inside the `TransactionAbortStrategyContextImpl` as a builder
method or similar
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -293,22 +309,38 @@ private TransactionAbortStrategyContextImpl
getTransactionAbortStrategyContext(
producerPool.recycle(producer);
return epoch;
};
- Set<String> ongoingTransactionIds =
+ Set<String> precommittedTransactionalIds =
recoveredStates.stream()
.flatMap(
s ->
- s.getOngoingTransactions().stream()
+
s.getPrecommittedTransactionalIds().stream()
.map(CheckpointTransaction::getTransactionalId))
.collect(Collectors.toSet());
+ Set<Integer> oldSubtaskIds =
+ recoveredStates.stream()
+ .map(KafkaWriterState::getSubtaskId)
+ .collect(Collectors.toSet());
+ Integer oldParallelism =
+ recoveredStates.stream()
+ .map(KafkaWriterState::getParallelism)
+ .findFirst()
+ .orElse(null);
+ // Special case: recovery without checkpoint -> first subtask owns
everything
+ if (recoveredStates.isEmpty() &&
kafkaSinkContext.getParallelInstanceId() == 0) {
+ oldSubtaskIds = Set.of(0);
+ oldParallelism = 1;
+ }
Review Comment:
I think the block is easier to understand with a dedicated method that
returns immediately on the special condition. It also avoids reassigning the
variables
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]