AHeise commented on code in PR #154:
URL:
https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2016086354
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java:
##########
@@ -53,6 +55,20 @@ public FlinkKafkaInternalProducer<byte[], byte[]>
getTransactionalProducer(
}
return
context.getProducer(context.buildTransactionalId(expectedCheckpointId));
}
+ },
+ POOLING {
+ @Override
+ public FlinkKafkaInternalProducer<byte[], byte[]>
getTransactionalProducer(
+ Context context) {
+ Collection<String> usedTransactionalIds =
context.getOngoingTransactions();
+ for (int offset = 0; ; offset++) {
+ String transactionalIdCandidate =
context.buildTransactionalId(offset);
+ if (usedTransactionalIds.contains(transactionalIdCandidate)) {
+ continue;
+ }
+ return context.getProducer(transactionalIdCandidate);
Review Comment:
We have a debug log when a new producer is created.
WDYT about having a configurable upper pool limit and fail over otherwise?
This is not exactly the same as a fixed size pool (which we had before) since
users may set an arbitrarily large value that has no impact on the recovery
logic (in contrast to the fixed size pools we had before). But it provides some
safety until we gained practical experience.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]