AHeise commented on code in PR #70:
URL:
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1671715837
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##########
@@ -72,12 +74,17 @@ private static Properties withTransactionalId(
return props;
}
+ public long getPendingRecordsCount() {
+ return pendingRecords.get();
+ }
+
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
if (inTransaction) {
hasRecordsInTransaction = true;
}
- return super.send(record, callback);
+ pendingRecords.incrementAndGet();
+ return super.send(record, new TrackingCallback(callback));
Review Comment:
I can't quite follow. I was proposing to use
`return super.send(record, callbackCache.computeIfAbsent(callback,
TrackingCallback::new));`
So we have 3 cases:
- New callback, wrap in `TackingCallback` and cache.
- Existing callback (common case), retrieve existing callback and use it.
- Remove existing `TackingCallback` from cache if full.
In all cases, both the TackingCallback and the original callback will be
invoked. The only difference to the code without cache is that we avoiding
creating extra TrackingCallback instances around the same original callback.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]