AHeise commented on code in PR #70:
URL:
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1669998602
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##########
@@ -86,6 +93,11 @@ public void flush() {
if (inTransaction) {
flushNewPartitions();
}
+ final long pendingRecordsCount = pendingRecords.get();
+ if (pendingRecordsCount != 0) {
+ throw new IllegalStateException(
+ "Pending record count must be zero at this point: " +
pendingRecordsCount);
Review Comment:
I'd improve the error message as follows:
`Some records have not been fully persisted in Kafka. As a precaution, Flink
will restart to resume from previous checkpoint. Please report this issue with
logs on https://issues.apache.org/jira/browse/FLINK-33545`.
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##########
@@ -72,12 +74,17 @@ private static Properties withTransactionalId(
return props;
}
+ public long getPendingRecordsCount() {
+ return pendingRecords.get();
+ }
+
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
if (inTransaction) {
hasRecordsInTransaction = true;
}
- return super.send(record, callback);
+ pendingRecords.incrementAndGet();
+ return super.send(record, new TrackingCallback(callback));
Review Comment:
This change creates a new callback with every `send`. Since the callback
being passed in our codebase is mostly constant, we should add a simple cache
like `new LRUMap(3);`. The number is kind of arbitrary and 1 should work
already. The most important part is that it shouldn't grow boundless or we get
the next memory leak if I overlooked a dynamic usage ;).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]