[
https://issues.apache.org/jira/browse/GOBBLIN-684?focusedWorklogId=201118&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-201118
]
ASF GitHub Bot logged work on GOBBLIN-684:
------------------------------------------
Author: ASF GitHub Bot
Created on: 20/Feb/19 06:33
Start Date: 20/Feb/19 06:33
Worklog Time Spent: 10m
Work Description: htran1 commented on pull request #2556: GOBBLIN-684:
Ensure buffered messages are flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556#discussion_r258348552
##########
File path:
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
##########
@@ -80,17 +98,48 @@ public KafkaKeyValueProducerPusher(String brokers, String
topic) {
*/
public void pushMessages(List<Pair<K, V>> messages) {
for (Pair<K, V> message: messages) {
- this.producer.send(new ProducerRecord<>(topic, message.getKey(),
message.getValue()), (recordMetadata, e) -> {
+ this.futures.offer(this.producer.send(new ProducerRecord<>(topic,
message.getKey(), message.getValue()), (recordMetadata, e) -> {
if (e != null) {
log.error("Failed to send message to topic {} due to exception: ",
topic, e);
}
- });
+ }));
+ }
+
+ //Once the low watermark of numFuturesToBuffer is hit, start flushing
messages from the futures
+ // buffer. In order to avoid blocking on newest messages added to futures
queue, we only invoke future.get() on
+ // the oldest messages in the futures buffer. The number of messages to
flush is same as the number of messages added
+ // in the current call. Note this does not completely avoid calling
future.get() on the newer messages e.g. when
+ // multiple threads enter the if{} block concurrently, and invoke flush().
+ if (this.futures.size() >= this.numFuturesToBuffer) {
+ flush(messages.size());
+ }
+ }
+
+ /**
+ * Flush any records that may be present in the producer buffer upto a
maximum of <code>numRecordsToFlush</code>.
+ * This method is needed since Kafka 0.8 producer does not have a flush()
API. In the absence of the flush()
+ * implementation, records which are present in the buffer but not in-flight
may not be delivered at all when close()
+ * is called, leading to data loss.
+ * @param numRecordsToFlush
+ */
+ private void flush(long numRecordsToFlush) {
+ log.info("Flushing records from producer buffer");
Review comment:
Can you make this log and the one at the end debug level or print it only
when `numRecordsToFlush == Long.MAX_VALUE`? I think this may flood the log
since flush() is called whenever the futures queue has hit the limit.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 201118)
Time Spent: 1h (was: 50m)
> Ensure buffered messages are flushed before close() in KafkaProducerPusher
> --------------------------------------------------------------------------
>
> Key: GOBBLIN-684
> URL: https://issues.apache.org/jira/browse/GOBBLIN-684
> Project: Apache Gobblin
> Issue Type: Improvement
> Components: gobblin-metrics
> Affects Versions: 0.15.0
> Reporter: Sudarshan Vasudevan
> Assignee: Sudarshan Vasudevan
> Priority: Major
> Fix For: 0.15.0
>
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Currently, when KafkaProducerPusher is closed, it invokes
> KafkaProducer#close(). However,close() only guarantees delivery of in-flight
> messages, not the messages in the producer buffer waiting to be sent out.
> This results in data loss.
> The fix ensures that we call flush() before close(). As a result, any
> buffered messages are immediately pushed out and we block until the messages
> are acked.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)