AHeise commented on code in PR #70:
URL: 
https://github.com/apache/flink-connector-kafka/pull/70#discussion_r1669998602


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##########
@@ -86,6 +93,11 @@ public void flush() {
         if (inTransaction) {
             flushNewPartitions();
         }
+        final long pendingRecordsCount = pendingRecords.get();
+        if (pendingRecordsCount != 0) {
+            throw new IllegalStateException(
+                    "Pending record count must be zero at this point: " + 
pendingRecordsCount);

Review Comment:
   I'd improve the error message as follows:
   
   `Some records have not been fully persisted in Kafka. As a precaution, Flink 
will restart to resume from previous checkpoint. Please report this issue with 
logs on https://issues.apache.org/jira/browse/FLINK-33545`.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java:
##########
@@ -72,12 +74,17 @@ private static Properties withTransactionalId(
         return props;
     }
 
+    public long getPendingRecordsCount() {
+        return pendingRecords.get();
+    }
+
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback) {
         if (inTransaction) {
             hasRecordsInTransaction = true;
         }
-        return super.send(record, callback);
+        pendingRecords.incrementAndGet();
+        return super.send(record, new TrackingCallback(callback));

Review Comment:
   This change creates a new callback with every `send`. Since the callback 
being passed in our codebase is mostly constant, we should add a simple cache 
like `new LRUMap(3);`. The number is kind of arbitrary and 1 should work 
already. The most important part is that it shouldn't grow boundless or we get 
the next memory leak if I overlooked a dynamic usage ;).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to