szaszm commented on code in PR #1885:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1885#discussion_r1840860617


##########
extensions/kafka/ConsumeKafka.h:
##########
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "KafkaConnection.h"
+#include "KafkaProcessorBase.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "core/RelationshipDefinition.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "io/StreamPipe.h"
+#include "rdkafka.h"
+#include "rdkafka_utils.h"
+#include "utils/ArrayUtils.h"
+
+namespace org::apache::nifi::minifi {
+
+namespace core {
+class ConsumeKafkaMaxPollTimePropertyType : public TimePeriodPropertyType {
+ public:
+  constexpr ~ConsumeKafkaMaxPollTimePropertyType() override {}  // NOLINT see 
comment at grandparent
+
+  [[nodiscard]] ValidationResult validate(const std::string& subject, const 
std::string& input) const override;
+};
+
+inline constexpr ConsumeKafkaMaxPollTimePropertyType 
CONSUME_KAFKA_MAX_POLL_TIME_TYPE{};
+}  // namespace core
+
+namespace processors {
+
+class ConsumeKafka final : public KafkaProcessorBase {
+ public:
+  // Security Protocol allowable values
+  static constexpr std::string_view SECURITY_PROTOCOL_PLAINTEXT = "plaintext";
+  static constexpr std::string_view SECURITY_PROTOCOL_SSL = "ssl";
+
+  // Topic Name Format allowable values
+  static constexpr std::string_view TOPIC_FORMAT_NAMES = "Names";
+  static constexpr std::string_view TOPIC_FORMAT_PATTERNS = "Patterns";
+
+  // Offset Reset allowable values
+  static constexpr std::string_view OFFSET_RESET_EARLIEST = "earliest";
+  static constexpr std::string_view OFFSET_RESET_LATEST = "latest";
+  static constexpr std::string_view OFFSET_RESET_NONE = "none";
+
+  // Key Attribute Encoding allowable values
+  static constexpr std::string_view KEY_ATTR_ENCODING_UTF_8 = "UTF-8";
+  static constexpr std::string_view KEY_ATTR_ENCODING_HEX = "Hex";
+
+  // Message Header Encoding allowable values
+  static constexpr std::string_view MSG_HEADER_ENCODING_UTF_8 = "UTF-8";
+  static constexpr std::string_view MSG_HEADER_ENCODING_HEX = "Hex";
+
+  // Duplicate Header Handling allowable values
+  static constexpr std::string_view MSG_HEADER_KEEP_FIRST = "Keep First";
+  static constexpr std::string_view MSG_HEADER_KEEP_LATEST = "Keep Latest";
+  static constexpr std::string_view MSG_HEADER_COMMA_SEPARATED_MERGE = 
"Comma-separated Merge";
+
+  // Flowfile attributes written
+  static constexpr std::string_view KAFKA_COUNT_ATTR =
+      "kafka.count";  // Always 1 until we start supporting merging from 
batches
+  static constexpr std::string_view KAFKA_MESSAGE_KEY_ATTR = "kafka.key";
+  static constexpr std::string_view KAFKA_OFFSET_ATTR = "kafka.offset";
+  static constexpr std::string_view KAFKA_PARTITION_ATTR = "kafka.partition";
+  static constexpr std::string_view KAFKA_TOPIC_ATTR = "kafka.topic";
+
+  static constexpr std::string_view DEFAULT_MAX_POLL_RECORDS = "10000";
+  static constexpr std::string_view DEFAULT_MAX_POLL_TIME = "4 seconds";
+
+  static constexpr const std::size_t METADATA_COMMUNICATIONS_TIMEOUT_MS{60000};
+
+  EXTENSIONAPI static constexpr const char* Description =
+      "Consumes messages from Apache Kafka and transform them into MiNiFi 
FlowFiles. "
+      "The application should make sure that the processor is triggered at 
regular intervals, even if no messages are "
+      "expected, "
+      "to serve any queued callbacks waiting to be called. Rebalancing can 
also only happen on trigger.";
+
+  EXTENSIONAPI static constexpr auto KafkaBrokers =
+      core::PropertyDefinitionBuilder<>::createProperty("Kafka Brokers")
+          .withDescription("A comma-separated list of known Kafka Brokers in 
the format <host>:<port>.")
+          .withPropertyType(core::StandardPropertyTypes::NON_BLANK_TYPE)
+          .withDefaultValue("localhost:9092")
+          .supportsExpressionLanguage(true)
+          .isRequired(true)
+          .build();
+  EXTENSIONAPI static constexpr auto TopicNames =
+      core::PropertyDefinitionBuilder<>::createProperty("Topic Names")
+          .withDescription(
+              "The name of the Kafka Topic(s) to pull from. Multiple topic 
names are supported as a comma separated list.")
+          .supportsExpressionLanguage(true)
+          .isRequired(true)
+          .build();
+  EXTENSIONAPI static constexpr auto TopicNameFormat =
+      core::PropertyDefinitionBuilder<2>::createProperty("Topic Name Format")
+          .withDescription(
+              "Specifies whether the Topic(s) provided are a comma separated 
list of names or a single regular expression. "
+              "Using regular expressions does not automatically discover Kafka 
topics created after the processor started.")
+          .withAllowedValues({TOPIC_FORMAT_NAMES, TOPIC_FORMAT_PATTERNS})
+          .withDefaultValue(TOPIC_FORMAT_NAMES)
+          .isRequired(true)
+          .build();
+  EXTENSIONAPI static constexpr auto HonorTransactions =
+      core::PropertyDefinitionBuilder<>::createProperty("Honor Transactions")
+          .withDescription(
+              "Specifies whether or not MiNiFi should honor transactional 
guarantees when communicating with Kafka. If "
+              "false, the Processor will use "
+              "an \"isolation level\" of "
+              "read_uncomitted. This means that messages will be received as 
soon as they are written to Kafka but will be "
+              "pulled, even if the "
+              "producer cancels the transactions. "
+              "If this value is true, MiNiFi will not receive any messages for 
which the producer's transaction was "
+              "canceled, but this can result in "
+              "some latency since the consumer "
+              "must wait for the producer to finish its entire transaction 
instead of pulling as the messages become "
+              "available.")

Review Comment:
   This clang-format reflow resulted in some unnecessary shorter lines. Please 
check the rest of the file too, there are more of these.



##########
extensions/kafka/PublishKafka.cpp:
##########
@@ -400,149 +374,148 @@ void PublishKafka::notifyStop() {
   logger_->log_debug("notifyStop called");
   interrupted_ = true;
   {
-    // Normally when we need both connection_mutex_ and messages_mutex_, we 
need to take connection_mutex_ first to avoid a deadlock.
-    // It's not possible to do that here, because we need to interrupt the 
messages while onTrigger is running and holding connection_mutex_.
-    // For this reason, we take messages_mutex_ only, interrupt the messages, 
then release the lock to let a possibly running onTrigger take it and finish.
-    // After onTrigger finishes, we can take connection_mutex_ and close the 
connection without needing to wait for message finishes/timeouts in onTrigger.
-    // A possible new onTrigger between our critical sections won't produce 
more messages because we set interrupted_ = true above.
+    // Normally when we need both connection_mutex_ and messages_mutex_, we 
need to take connection_mutex_ first to avoid a
+    // deadlock. It's not possible to do that here, because we need to 
interrupt the messages while onTrigger is running and
+    // holding connection_mutex_. For this reason, we take messages_mutex_ 
only, interrupt the messages, then release the
+    // lock to let a possibly running onTrigger take it and finish. After 
onTrigger finishes, we can take connection_mutex_
+    // and close the connection without needing to wait for message 
finishes/timeouts in onTrigger. A possible new onTrigger
+    // between our critical sections won't produce more messages because we 
set interrupted_ = true above.
     std::lock_guard<std::mutex> lock(messages_mutex_);
-    for (auto& messages : messages_set_) {
-      messages->interrupt();
-    }
+    for (auto& messages: messages_set_) { messages->interrupt(); }
   }
   std::lock_guard<std::mutex> conn_lock(connection_mutex_);
   conn_.reset();
 }
 
-
 bool PublishKafka::configureNewConnection(core::ProcessContext& context) {
-  std::string value;
-  int64_t valInt = 0;
-  std::string valueConf;
-  std::array<char, 512U> errstr{};
+  std::array<char, 512U> err_chars{};
   rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK;
-  const char* const PREFIX_ERROR_MSG = "PublishKafka: configure error result: 
";
+  constexpr std::string_view PREFIX_ERROR_MSG = "PublishKafka: configure error 
result: ";
 
-  std::unique_ptr<rd_kafka_conf_t, rd_kafka_conf_deleter> conf_{ 
rd_kafka_conf_new() };
-  if (conf_ == nullptr) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create 
rd_kafka_conf_t object");
-  }
+  utils::rd_kafka_conf_unique_ptr conf_{rd_kafka_conf_new()};
+  if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed 
to create rd_kafka_conf_t object"); }
 
   const auto* const key = conn_->getKey();
 
-  if (key->brokers_.empty()) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers");
-  }
-  result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", 
key->brokers_.c_str(), errstr.data(), errstr.size());
+  if (key->brokers_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, 
"There are no brokers"); }
+  result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", 
key->brokers_.c_str(), err_chars.data(), err_chars.size());
   logger_->log_debug("PublishKafka: bootstrap.servers [{}]", key->brokers_);
   if (result != RD_KAFKA_CONF_OK) {
-    auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, errstr.data());
+    auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
-  if (key->client_id_.empty()) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty");
-  }
-  result = rd_kafka_conf_set(conf_.get(), "client.id", 
key->client_id_.c_str(), errstr.data(), errstr.size());
+  if (key->client_id_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, 
"Client id is empty"); }
+  result = rd_kafka_conf_set(conf_.get(), "client.id", 
key->client_id_.c_str(), err_chars.data(), err_chars.size());
   logger_->log_debug("PublishKafka: client.id [{}]", key->client_id_);
   if (result != RD_KAFKA_CONF_OK) {
-    auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, errstr.data());
+    auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
   }
 
-  value = "";
-  if (context.getProperty(DebugContexts, value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "debug", value.c_str(), 
errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: debug [{}]", value);
+  if (auto debug_contexts = context.getProperty(DebugContexts); debug_contexts 
&& !debug_contexts->empty()) {
+    result = rd_kafka_conf_set(conf_.get(), "debug", debug_contexts->c_str(), 
err_chars.data(), err_chars.size());
+    logger_->log_debug("PublishKafka: debug [{}]", *debug_contexts);
     if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
+      auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, 
err_chars.data());
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
 
-  value = "";
-  if (context.getProperty(MaxMessageSize, value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", 
value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: message.max.bytes [{}]", value);
+  if (auto max_message_size = context.getProperty(MaxMessageSize); 
max_message_size && !max_message_size->empty()) {
+    result =
+        rd_kafka_conf_set(conf_.get(), "message.max.bytes", 
max_message_size->c_str(), err_chars.data(), err_chars.size());

Review Comment:
   I don't like this formatting change either :/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to