Repository: incubator-gobblin Updated Branches: refs/heads/master 68f6a1611 -> dc96e3e78
[GOBBLIN-651][GOBBLIN-650] Ensure ordered delivery of Kafka events from KeyValueProducerPusher for kafka-08.[] Closes #2520 from sv2000/kafkaOrderedRedux Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/dc96e3e7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/dc96e3e7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/dc96e3e7 Branch: refs/heads/master Commit: dc96e3e780358550d997e689bc695cbe19f79996 Parents: 68f6a16 Author: suvasude <[email protected]> Authored: Tue Dec 11 10:30:01 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Dec 11 10:30:01 2018 -0800 ---------------------------------------------------------------------- .../apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java | 2 ++ 1 file changed, 2 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dc96e3e7/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java index ec930fc..e9ea3ab 100644 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java @@ -59,6 +59,8 @@ public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3); + //To guarantee ordered delivery, the maximum in flight requests must be set to 1. + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // add the kafka scoped config. if any of the above are specified then they are overridden if (kafkaConfig.isPresent()) {
