lordgamez commented on a change in pull request #940:
URL: https://github.com/apache/nifi-minifi-cpp/pull/940#discussion_r534990092



##########
File path: extensions/librdkafka/tests/ConsumeKafkaTests.cpp
##########
@@ -0,0 +1,595 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_MAIN
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include <set>
+
+// #include "TestBase.h"
+#include "../../../libminifi/test/TestBase.h"
+
+#include "../ConsumeKafka.h"
+#include "../rdkafka_utils.h"
+#include "../../standard-processors/processors/ExtractText.h"
+#include "utils/file/FileUtils.h"
+#include "utils/OptionalUtils.h"
+#include "utils/RegexUtils.h"
+#include "utils/StringUtils.h"
+#include "utils/TestUtils.h"
+
+#include "utils/IntegrationTestUtils.h"
+
+namespace {
+using org::apache::nifi::minifi::utils::optional;
+
+class KafkaTestProducer {
+ public:
+  enum class PublishEvent {
+    PUBLISH,
+    TRANSACTION_START,
+    TRANSACTION_COMMIT,
+    CANCEL
+  };
+  KafkaTestProducer(const std::string& kafka_brokers, const std::string& 
topic, const bool transactional) :
+      logger_(logging::LoggerFactory<KafkaTestProducer>::getLogger()) {
+    using utils::setKafkaConfigurationField;
+
+    std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf = { 
rd_kafka_conf_new(), utils::rd_kafka_conf_deleter() };
+
+    setKafkaConfigurationField(conf.get(), "bootstrap.servers", kafka_brokers);
+    // setKafkaConfigurationField(conf.get(), "client.id", 
PRODUCER_CLIENT_NAME);
+    setKafkaConfigurationField(conf.get(), "compression.codec", "snappy");
+    setKafkaConfigurationField(conf.get(), "batch.num.messages", "1");
+
+    if (transactional) {
+      setKafkaConfigurationField(conf.get(), "transactional.id", 
"ConsumeKafkaTest_transaction_id");
+    }
+
+    static std::array<char, 512U> errstr{};
+    producer_ = { rd_kafka_new(RD_KAFKA_PRODUCER, conf.release(), 
errstr.data(), errstr.size()), utils::rd_kafka_producer_deleter() };
+    if (producer_ == nullptr) {
+      auto error_msg = utils::StringUtils::join_pack("Failed to create Kafka 
producer %s", errstr.data());
+      throw std::runtime_error(error_msg);
+    }
+
+    // The last argument is a config here, but it is already owned by the 
consumer. I assume that this would mean an override on the original config if 
used
+    topic_ = { rd_kafka_topic_new(producer_.get(), topic.c_str(), nullptr), 
utils::rd_kafka_topic_deleter() };
+
+    if (transactional) {
+      rd_kafka_init_transactions(producer_.get(), 
TRANSACTIONS_TIMEOUT_MS.count());
+    }
+  }
+
+  // Uses all the headers for every published message
+  void publish_messages_to_topic(
+      const std::vector<std::string>& messages_on_topic, const std::string& 
message_key, std::vector<PublishEvent> events,
+      const std::vector<std::pair<std::string, std::string>>& message_headers, 
const optional<std::string>& message_header_encoding) {
+    auto next_message = messages_on_topic.cbegin();
+    for (const PublishEvent event : events) {
+      switch (event) {
+        case PublishEvent::PUBLISH:
+          REQUIRE(messages_on_topic.cend() != next_message);
+          publish_message(*next_message, message_key, message_headers, 
message_header_encoding);
+          std::advance(next_message, 1);
+          break;
+        case PublishEvent::TRANSACTION_START:
+          logger_->log_debug("Starting new transaction...");
+          rd_kafka_begin_transaction(producer_.get());
+          break;
+        case PublishEvent::TRANSACTION_COMMIT:
+          logger_->log_debug("Committing transaction...");
+          rd_kafka_commit_transaction(producer_.get(), 
TRANSACTIONS_TIMEOUT_MS.count());
+          break;
+        case PublishEvent::CANCEL:
+          logger_->log_debug("Cancelling transaction...");
+          rd_kafka_abort_transaction(producer_.get(), 
TRANSACTIONS_TIMEOUT_MS.count());
+      }
+    }
+  }
+
+ private:
+  void publish_message(
+      const std::string& message, const std::string& message_key, const 
std::vector<std::pair<std::string, std::string>>& message_headers, const 
optional<std::string>& message_header_encoding) {
+    logger_->log_debug("Producing: %s", message.c_str());
+    std::unique_ptr<rd_kafka_headers_t, utils::rd_kafka_headers_deleter> 
headers(rd_kafka_headers_new(message_headers.size()), 
utils::rd_kafka_headers_deleter());
+    if (!headers) {
+      throw std::runtime_error("Generating message headers failed.");
+    }
+    for (const std::pair<std::string, std::string>& message_header : 
message_headers) {
+      rd_kafka_header_add(headers.get(),
+          const_cast<char*>(message_header.first.c_str()), 
message_header.first.size(),
+          const_cast<char*>(message_header.second.c_str()), 
message_header.second.size());
+    }
+
+    if (RD_KAFKA_RESP_ERR_NO_ERROR != rd_kafka_producev(
+        producer_.get(),
+        RD_KAFKA_V_RKT(topic_.get()),
+        RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA),
+        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
+        RD_KAFKA_V_VALUE(const_cast<char*>(&message[0]), message.size()),
+        RD_KAFKA_V_HEADERS(headers.release()),
+        RD_KAFKA_V_KEY(message_key.c_str(), message_key.size()),
+        RD_KAFKA_V_END)) {
+      logger_->log_error("Producer failure: %d: %s", rd_kafka_last_error(), 
rd_kafka_err2str(rd_kafka_last_error()));
+    }
+  }
+
+  static const std::chrono::milliseconds TRANSACTIONS_TIMEOUT_MS;
+
+  std::unique_ptr<rd_kafka_t, utils::rd_kafka_producer_deleter> producer_;
+  std::unique_ptr<rd_kafka_topic_t, utils::rd_kafka_topic_deleter> topic_;
+
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+const std::chrono::milliseconds KafkaTestProducer::TRANSACTIONS_TIMEOUT_MS{ 
2000 };
+
+class ConsumeKafkaTest {
+ public:
+  using Processor = org::apache::nifi::minifi::core::Processor;
+  using ConsumeKafka = org::apache::nifi::minifi::processors::ConsumeKafka;
+  using ExtractText = org::apache::nifi::minifi::processors::ExtractText;
+
+  const KafkaTestProducer::PublishEvent PUBLISH            = 
KafkaTestProducer::PublishEvent::PUBLISH;
+  const KafkaTestProducer::PublishEvent TRANSACTION_START  = 
KafkaTestProducer::PublishEvent::TRANSACTION_START;
+  const KafkaTestProducer::PublishEvent TRANSACTION_COMMIT = 
KafkaTestProducer::PublishEvent::TRANSACTION_COMMIT;
+  const KafkaTestProducer::PublishEvent CANCEL             = 
KafkaTestProducer::PublishEvent::CANCEL;
+
+  const std::vector<KafkaTestProducer::PublishEvent> 
NON_TRANSACTIONAL_MESSAGES   { PUBLISH, PUBLISH };
+  const std::vector<KafkaTestProducer::PublishEvent> 
SINGLE_COMMITTED_TRANSACTION { TRANSACTION_START, PUBLISH, PUBLISH, 
TRANSACTION_COMMIT };
+  const std::vector<KafkaTestProducer::PublishEvent> TWO_SEPARATE_TRANSACTIONS 
   { TRANSACTION_START, PUBLISH, TRANSACTION_COMMIT, TRANSACTION_START, 
PUBLISH, TRANSACTION_COMMIT };
+  const std::vector<KafkaTestProducer::PublishEvent> NON_COMMITTED_TRANSACTION 
   { TRANSACTION_START, PUBLISH, PUBLISH };
+  const std::vector<KafkaTestProducer::PublishEvent> COMMIT_AND_CANCEL         
   { TRANSACTION_START, PUBLISH, CANCEL };
+
+  const std::string KEEP_FIRST            = 
ConsumeKafka::MSG_HEADER_KEEP_FIRST;
+  const std::string KEEP_LATEST           = 
ConsumeKafka::MSG_HEADER_KEEP_LATEST;
+  const std::string COMMA_SEPARATED_MERGE = 
ConsumeKafka::MSG_HEADER_COMMA_SEPARATED_MERGE;
+
+  static const std::string PRODUCER_TOPIC;
+  static const std::string TEST_MESSAGE_KEY;
+
+  // Relationships
+  const core::Relationship success {"success", "description"};
+  const core::Relationship failure {"failure", "description"};
+
+  ConsumeKafkaTest() :
+      logTestController_(LogTestController::getInstance()),
+      logger_(logging::LoggerFactory<ConsumeKafkaTest>::getLogger()) {
+      reInitialize();
+  }
+
+  virtual ~ConsumeKafkaTest() {
+    logTestController_.reset();
+  }
+
+ protected:
+  void reInitialize() {
+    testController_.reset(new TestController());
+    plan_ = testController_->createPlan();
+    logTestController_.setError<LogTestController>();
+    logTestController_.setError<TestPlan>();
+    logTestController_.setTrace<ConsumeKafka>();
+    logTestController_.setTrace<ConsumeKafkaTest>();
+    logTestController_.setTrace<KafkaTestProducer>();
+    logTestController_.setDebug<ExtractText>();
+    logTestController_.setDebug<core::ProcessContext>();
+  }
+
+  void optional_set_property(const std::shared_ptr<core::Processor>& 
processor, const std::string& property_name, const optional<std::string>& 
opt_value) {
+    if (opt_value) {
+      plan_->setProperty(processor, property_name, opt_value.value());
+    }
+  }
+
+  std::string decode_key(const std::string& key, const optional<std::string>& 
key_attribute_encoding) {
+    if (!key_attribute_encoding || 
utils::StringUtils::equalsIgnoreCase(ConsumeKafka::KEY_ATTR_ENCODING_UTF_8, 
key_attribute_encoding.value())) {
+      return key;
+    }
+    if 
(utils::StringUtils::equalsIgnoreCase(ConsumeKafka::ConsumeKafka::KEY_ATTR_ENCODING_HEX,
 key_attribute_encoding.value())) {
+      return utils::StringUtils::from_hex(key);
+    }
+    throw std::runtime_error("Message Header Encoding does not match any of 
the presets in the test.");
+  }
+
+  std::vector<std::string> sort_and_split_messages(const 
std::vector<std::string>& messages_on_topic, const optional<std::string>& 
message_demarcator) {
+    if (message_demarcator) {
+      std::vector<std::string> sorted_split_messages;
+      for (const auto& message : messages_on_topic) {
+        std::vector<std::string> split_message = 
utils::StringUtils::split(message, message_demarcator.value());
+        std::move(split_message.begin(), split_message.end(), 
std::back_inserter(sorted_split_messages));
+      }
+      std::sort(sorted_split_messages.begin(), sorted_split_messages.end());
+      return sorted_split_messages;
+    }
+    std::vector<std::string> sorted_messages{ messages_on_topic.cbegin(), 
messages_on_topic.cend() };
+    std::sort(sorted_messages.begin(), sorted_messages.end());
+    return sorted_messages;
+  }
+
+  static const std::chrono::seconds MAX_CONSUMEKAFKA_POLL_TIME_SECONDS;
+  static const std::string ATTRIBUTE_FOR_CAPTURING_CONTENT;
+  static const std::string TEST_FILE_NAME_POSTFIX;
+
+  std::unique_ptr<TestController> testController_;
+  std::shared_ptr<TestPlan> plan_;
+  LogTestController& logTestController_;
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+class ConsumeKafkaPropertiesTest : public ConsumeKafkaTest {
+ public:
+  ConsumeKafkaPropertiesTest() : ConsumeKafkaTest() {}
+  virtual ~ConsumeKafkaPropertiesTest() {
+    logTestController_.reset();
+  }
+
+  void single_consumer_with_plain_text_test(
+      bool expect_config_valid,
+      bool expect_fixed_message_order,
+      const std::vector<std::pair<std::string, std::string>>& 
expect_header_attributes,
+      const std::vector<std::string>& messages_on_topic,
+      const std::vector<KafkaTestProducer::PublishEvent>& transaction_events,
+      const std::vector<std::pair<std::string, std::string>>& message_headers,
+      const std::string& kafka_brokers,
+      const std::string& security_protocol,
+      const std::string& topic_names,
+      const optional<std::string>& topic_name_format,
+      const optional<bool>& honor_transactions,
+      const optional<std::string>& group_id,
+      const optional<std::string>& offset_reset,
+      const optional<std::string>& key_attribute_encoding,
+      const optional<std::string>& message_demarcator,
+      const optional<std::string>& message_header_encoding,
+      const optional<std::string>& headers_to_add_as_attributes,
+      const optional<std::string>& duplicate_header_handling,
+      const optional<std::string>& max_poll_records,
+      const optional<std::string>& max_poll_time,
+      const optional<std::string>& session_timeout) {

Review comment:
       I see I suppose you are right in this case it is explicit what 
parameters are used as they are fixed and it can be set in a single line. Let's 
keep it as it is, I'm not against it. The only downside I see is that because 
of the long list of string parameters it could be easy to mix them up when 
calling, but I suppose it can be easily realized after the test fails.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to