[ 
https://issues.apache.org/jira/browse/GOBBLIN-684?focusedWorklogId=200309&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-200309
 ]

ASF GitHub Bot logged work on GOBBLIN-684:
------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Feb/19 01:47
            Start Date: 19/Feb/19 01:47
    Worklog Time Spent: 10m 
      Work Description: sv2000 commented on pull request #2556: GOBBLIN-684: 
Ensure buffered messages are flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556#discussion_r257866302
 
 

 ##########
 File path: 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
 ##########
 @@ -80,17 +88,46 @@ public KafkaKeyValueProducerPusher(String brokers, String 
topic) {
    */
   public void pushMessages(List<Pair<K, V>> messages) {
     for (Pair<K, V> message: messages) {
-      this.producer.send(new ProducerRecord<>(topic, message.getKey(), 
message.getValue()), (recordMetadata, e) -> {
+      this.futures.offer(this.producer.send(new ProducerRecord<>(topic, 
message.getKey(), message.getValue()), (recordMetadata, e) -> {
         if (e != null) {
           log.error("Failed to send message to topic {} due to exception: ", 
topic, e);
         }
-      });
+      }));
+    }
+
+    //Accumulate futures returned from send() into a buffer; will be used to 
simulate flush by calling get() on
+    // each of the accumulated futures.
+    if (this.futures.size() >= MAX_NUM_FUTURES_TO_BUFFER) {
+      flush(MAX_NUM_FUTURES_TO_BUFFER);
+      this.futures.clear();
 
 Review comment:
   Good catch! Yes, the clear() is not needed. Also, changed the pushMessages() 
method to exclude the newest added messages. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 200309)
    Time Spent: 50m  (was: 40m)

> Ensure buffered messages are flushed before close() in KafkaProducerPusher
> --------------------------------------------------------------------------
>
>                 Key: GOBBLIN-684
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-684
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-metrics
>    Affects Versions: 0.15.0
>            Reporter: Sudarshan Vasudevan
>            Assignee: Sudarshan Vasudevan
>            Priority: Major
>             Fix For: 0.15.0
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Currently, when KafkaProducerPusher is closed, it invokes 
> KafkaProducer#close(). However,close() only guarantees delivery of in-flight 
> messages, not the messages in the producer buffer waiting to be sent out. 
> This results in data loss.
> The fix ensures that we call flush() before close(). As a result, any 
> buffered messages are immediately pushed out and we block until the messages 
> are acked. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to