This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 8baf3be  MINIFICPP-1050 - Make PublishKafka's Delivery Guarantee 
backwards compatible
8baf3be is described below

commit 8baf3bed60f9a08688db43c82ba01a2fea6b6718
Author: Daniel Bakai <[email protected]>
AuthorDate: Sat Sep 28 21:28:54 2019 +0200

    MINIFICPP-1050 - Make PublishKafka's Delivery Guarantee backwards compatible
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #654
---
 NOTICE                                 |  1 +
 extensions/librdkafka/PublishKafka.cpp | 20 +++++++++++++++++++-
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/NOTICE b/NOTICE
index 7dcbc81..e8dcee8 100644
--- a/NOTICE
+++ b/NOTICE
@@ -34,3 +34,4 @@ This includes derived works from the CMake (BSD 3-Clause 
licensed) project (http
 Copyright 2000-2019 Kitware, Inc. and Contributors
 The derived work is adapted from
   Modules/FindPatch.cmake
+and can be found in cmake/FindPatch.cmake
diff --git a/extensions/librdkafka/PublishKafka.cpp 
b/extensions/librdkafka/PublishKafka.cpp
index 75f8839..de72d23 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -45,7 +45,10 @@ core::Property PublishKafka::Topic(
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
 
 core::Property PublishKafka::DeliveryGuarantee(
-    core::PropertyBuilder::createProperty("Delivery 
Guarantee")->withDescription("Specifies the requirement for guaranteeing that a 
message is sent to Kafka")
+    core::PropertyBuilder::createProperty("Delivery 
Guarantee")->withDescription("Specifies the requirement for guaranteeing that a 
message is sent to Kafka. "
+                                                                               
  "Valid values are 0 (do not wait for acks), "
+                                                                               
  "-1 or all (block until message is committed by all in sync replicas) "
+                                                                               
  "or any concrete number of nodes.")
         
->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build());
 
 core::Property PublishKafka::MaxMessageSize(
@@ -411,6 +414,21 @@ bool PublishKafka::createNewTopic(const 
std::shared_ptr<KafkaConnection> &conn,
 
   value = "";
   if (context->getProperty(DeliveryGuarantee.getName(), value) && 
!value.empty()) {
+    /*
+     * Because of a previous error in this processor, the default value of 
this property was "DELIVERY_ONE_NODE".
+     * As this is not a valid value for "request.required.acks", the following 
rd_kafka_topic_conf_set call failed,
+     * but because of an another error, this failure was silently ignored, 
meaning that the the default value for
+     * "request.required.acks" did not change, and thus remained "-1". This 
means that having "DELIVERY_ONE_NODE" as
+     * the value of this property actually caused the processor to wait for 
delivery ACKs from ALL nodes, instead
+     * of just one. In order not to break configurations generated with 
earlier versions and keep the same behaviour
+     * as they had, we have to map "DELIVERY_ONE_NODE" to "-1" here.
+     */
+    if (value == "DELIVERY_ONE_NODE") {
+      value = "-1";
+      logger_->log_warn("Using DELIVERY_ONE_NODE as the Delivery Guarantee 
property is deprecated and is translated to -1 "
+                        "(block until message is committed by all in sync 
replicas) for backwards compatibility. "
+                        "If you want to wait for one acknowledgment use '1' as 
the property.");
+    }
     result = rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", 
value.c_str(), errstr.data(), errstr.size());
     logger_->log_debug("PublishKafka: request.required.acks [%s]", value);
     if (result != RD_KAFKA_CONF_OK) {

Reply via email to