Repository: incubator-gobblin Updated Branches: refs/heads/master d10bae881 -> 68f6a1611
[GOBBLIN-650] Ensure ordered delivery of Kafka events from KeyValueProducerPusher.[] Closes #2519 from sv2000/kafkaOrdered Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/68f6a161 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/68f6a161 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/68f6a161 Branch: refs/heads/master Commit: 68f6a1611f4ab40ed10c12c90bafffa0b3402a37 Parents: d10bae8 Author: suvasude <[email protected]> Authored: Tue Dec 11 09:51:40 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Dec 11 09:51:40 2018 -0800 ---------------------------------------------------------------------- .../apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java | 2 ++ 1 file changed, 2 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/68f6a161/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java index ba0e5ff..e4ad6ca 100644 --- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java @@ -59,6 +59,8 @@ public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3); + //To guarantee ordered delivery, the maximum in flight requests must be set to 1. + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // add the kafka scoped config. if any of the above are specified then they are overridden if (kafkaConfig.isPresent()) {
