fgerlits commented on code in PR #1350:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1350#discussion_r905172649


##########
PROCESSORS.md:
##########
@@ -1279,6 +1280,24 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 
 
 
+## ListenTCP
+
+### Description
+
+Listens for incoming TCP connections and reads data from each connection using 
a line separator as the message demarcator. For each message the processor 
produces a single FlowFile.
+
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name                          | Default Value | Allowable Values | 
Description                                                                     
                                                                                
                              |
+|-------------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Listening Port**            |               |                  | The port 
to listen on for communication.                                                 
                                                                                
                     |
+| **Max Batch Size**            | 500           |                  | The 
maximum number of messages to process at a time.                                
                                                                                
                          |
+| **Max Size of Message Queue** | 0             |                  | Maximum 
number of messages allowed to be buffered before processing them when the 
processor is triggered. If the buffer full, the message is ignored. If set to 
zero the buffer is unlimited. |

Review Comment:
   Are we sure we want the default buffer size to be unlimited?  I understand 
that dropping messages is bad, but unlimited memory consumption can be even 
worse.



##########
docker/test/integration/steps/steps.py:
##########
@@ -416,6 +416,12 @@ def step_impl(context):
     context.test.start_splunk()
 
 
+# TCP client
+@given('a TCP client is setup to send logs to minifi')

Review Comment:
   "set up" should be two words; "setup" is a noun



##########
PROCESSORS.md:
##########
@@ -1279,6 +1280,24 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 
 
 
+## ListenTCP
+
+### Description
+
+Listens for incoming TCP connections and reads data from each connection using 
a line separator as the message demarcator. For each message the processor 
produces a single FlowFile.
+
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name                          | Default Value | Allowable Values | 
Description                                                                     
                                                                                
                              |
+|-------------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Listening Port**            |               |                  | The port 
to listen on for communication.                                                 
                                                                                
                     |
+| **Max Batch Size**            | 500           |                  | The 
maximum number of messages to process at a time.                                
                                                                                
                          |
+| **Max Size of Message Queue** | 0             |                  | Maximum 
number of messages allowed to be buffered before processing them when the 
processor is triggered. If the buffer full, the message is ignored. If set to 
zero the buffer is unlimited. |

Review Comment:
   typo:
   ```suggestion
   | **Max Size of Message Queue** | 0             |                  | Maximum 
number of messages allowed to be buffered before processing them when the 
processor is triggered. If the buffer is full, the message is ignored. If set 
to zero the buffer is unlimited. |
   ```



##########
extensions/standard-processors/processors/ListenSyslog.cpp:
##########
@@ -86,72 +86,20 @@ void ListenSyslog::initialize() {
 void ListenSyslog::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
   gsl_Expects(context && !server_thread_.joinable() && !server_);

Review Comment:
   Since `server_thread_` and `server_` have been moved to 
`NetworkListenerProcessor`, I would move these two expectation checks to 
`NetworkListenerProcessor::startServer()`.
   
   (Also from `ListenTCP::onSchedule()`.)



##########
libminifi/include/utils/net/UdpServer.h:
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <optional>
+#include <memory>
+#include <string>
+
+#include "Server.h"
+#include "utils/MinifiConcurrentQueue.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "asio/ts/buffer.hpp"
+#include "asio/ts/internet.hpp"
+#include "asio/streambuf.hpp"
+
+namespace org::apache::nifi::minifi::utils::net {
+
+class UdpServer : public Server {
+ public:
+  UdpServer(std::optional<size_t> max_queue_size,
+            uint16_t port,
+            std::shared_ptr<core::logging::Logger> logger);
+
+ private:
+  void doReceive();
+
+  asio::ip::udp::socket socket_;
+  asio::ip::udp::endpoint sender_endpoint_;
+  std::string buffer_;
+
+  static inline constexpr size_t MAX_UDP_PACKET_SIZE = 65535;

Review Comment:
   Is this `inline` necessary?  I think `static constexpr` member variables are 
`inline` already.



##########
extensions/standard-processors/tests/unit/ListenTcpTests.cpp:
##########
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenTCP.h"
+#include "SingleProcessorTestController.h"
+#include "asio.hpp"
+
+using ListenTCP = org::apache::nifi::minifi::processors::ListenTCP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10254;
+using ProcessorTriggerResult = std::unordered_map<core::Relationship, 
std::vector<std::shared_ptr<core::FlowFile>>>;
+
+void sendMessagesViaTCP(const std::vector<std::string_view>& contents) {
+  asio::io_context io_context;
+  asio::ip::tcp::socket socket(io_context);
+  asio::ip::tcp::endpoint 
remote_endpoint(asio::ip::address::from_string("127.0.0.1"), PORT);
+  socket.connect(remote_endpoint);
+  std::error_code err;
+  for (auto& content : contents) {
+    std::string tcp_message(content);
+    tcp_message += '\n';
+    socket.send(asio::buffer(tcp_message, tcp_message.size()), 0, err);
+  }
+  REQUIRE(!err);
+  socket.close();
+}
+
+void check_for_attributes(core::FlowFile& flow_file) {
+  CHECK(std::to_string(PORT) == flow_file.getAttribute("tcp.port"));
+  CHECK("127.0.0.1" == flow_file.getAttribute("tcp.sender"));
+}
+
+bool triggerUntil(test::SingleProcessorTestController& controller,
+                  const std::unordered_map<core::Relationship, size_t>& 
expected_quantities,
+                  ProcessorTriggerResult& result,
+                  const std::chrono::milliseconds max_duration,
+                  const std::chrono::milliseconds wait_time = 50ms) {
+  auto start_time = std::chrono::steady_clock::now();
+  while (std::chrono::steady_clock::now() < start_time + max_duration) {
+    for (auto& [relationship, flow_files] : controller.trigger()) {
+      result[relationship].insert(result[relationship].end(), 
flow_files.begin(), flow_files.end());
+    }
+    bool expected_quantities_met = true;
+    for (const auto& [relationship, expected_quantity] : expected_quantities) {
+      if (result[relationship].size() < expected_quantity) {
+        expected_quantities_met = false;
+        break;
+      }
+    }
+    if (expected_quantities_met)
+      return true;

Review Comment:
   just a suggestion, feel free to ignore, but I think this would be nicer:
   ```suggestion
       if (ranges::all_of(expected_quantities, [&result](const auto& kv) {
         const auto& [relationship, expected_quantity] = kv;
         return result[relationship].size() >= expected_quantity;
       })) {
         return true;
       }
   ```



##########
extensions/standard-processors/tests/unit/ListenTcpTests.cpp:
##########
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <string>
+
+#include "Catch.h"
+#include "processors/ListenTCP.h"
+#include "SingleProcessorTestController.h"
+#include "asio.hpp"
+
+using ListenTCP = org::apache::nifi::minifi::processors::ListenTCP;
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+constexpr uint64_t PORT = 10254;
+using ProcessorTriggerResult = std::unordered_map<core::Relationship, 
std::vector<std::shared_ptr<core::FlowFile>>>;
+
+void sendMessagesViaTCP(const std::vector<std::string_view>& contents) {
+  asio::io_context io_context;
+  asio::ip::tcp::socket socket(io_context);
+  asio::ip::tcp::endpoint 
remote_endpoint(asio::ip::address::from_string("127.0.0.1"), PORT);
+  socket.connect(remote_endpoint);
+  std::error_code err;
+  for (auto& content : contents) {
+    std::string tcp_message(content);
+    tcp_message += '\n';
+    socket.send(asio::buffer(tcp_message, tcp_message.size()), 0, err);
+  }
+  REQUIRE(!err);
+  socket.close();
+}
+
+void check_for_attributes(core::FlowFile& flow_file) {
+  CHECK(std::to_string(PORT) == flow_file.getAttribute("tcp.port"));
+  CHECK("127.0.0.1" == flow_file.getAttribute("tcp.sender"));
+}
+
+bool triggerUntil(test::SingleProcessorTestController& controller,
+                  const std::unordered_map<core::Relationship, size_t>& 
expected_quantities,
+                  ProcessorTriggerResult& result,
+                  const std::chrono::milliseconds max_duration,
+                  const std::chrono::milliseconds wait_time = 50ms) {
+  auto start_time = std::chrono::steady_clock::now();
+  while (std::chrono::steady_clock::now() < start_time + max_duration) {
+    for (auto& [relationship, flow_files] : controller.trigger()) {
+      result[relationship].insert(result[relationship].end(), 
flow_files.begin(), flow_files.end());
+    }
+    bool expected_quantities_met = true;
+    for (const auto& [relationship, expected_quantity] : expected_quantities) {
+      if (result[relationship].size() < expected_quantity) {
+        expected_quantities_met = false;
+        break;
+      }
+    }
+    if (expected_quantities_met)
+      return true;
+    std::this_thread::sleep_for(wait_time);
+  }
+  return false;
+}
+
+bool countLogOccurrencesUntil(const std::string& pattern,
+                              const int occurrences,
+                              const std::chrono::milliseconds max_duration,
+                              const std::chrono::milliseconds wait_time = 
50ms) {
+  auto start_time = std::chrono::steady_clock::now();
+  while (std::chrono::steady_clock::now() < start_time + max_duration) {
+    if (LogTestController::getInstance().countOccurrences(pattern) == 
occurrences)
+      return true;
+    std::this_thread::sleep_for(wait_time);
+  }
+  return false;
+}

Review Comment:
   This is quite generic, and it's also used in `ListenSyslogTests`, so it may 
be worth moving it to `TestUtils` or `IntegrationTestUtils`.



##########
docker/test/integration/steps/steps.py:
##########
@@ -416,6 +416,12 @@ def step_impl(context):
     context.test.start_splunk()
 
 
+# TCP client
+@given('a TCP client is setup to send logs to minifi')

Review Comment:
   why "logs"?  it sends a test message
   ```suggestion
   @given('a TCP client is set up to send a test TCP message to minifi')
   ```



##########
extensions/standard-processors/processors/NetworkListenerProcessor.cpp:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "NetworkListenerProcessor.h"
+#include "utils/net/UdpServer.h"
+#include "utils/net/TcpServer.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+NetworkListenerProcessor::~NetworkListenerProcessor() {
+  stopServer();
+}
+
+void NetworkListenerProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext>&, const 
std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(session && max_batch_size_ > 0);
+  size_t logs_processed = 0;
+  while (!server_->queueEmpty() && logs_processed < max_batch_size_) {
+    utils::net::Message received_message;
+    if (!server_->tryDequeue(received_message))
+      break;
+    transferAsFlowFile(received_message, *session);
+    ++logs_processed;
+  }
+}
+
+void NetworkListenerProcessor::startServer(
+    const core::ProcessContext& context, const core::Property& 
max_batch_size_prop, const core::Property& max_queue_size_prop, const 
core::Property& port_prop, utils::net::Protocol protocol) {
+  context.getProperty(max_batch_size_prop.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is 
invalid");
+
+  uint64_t max_queue_size = 0;
+  context.getProperty(max_queue_size_prop.getName(), max_queue_size);
+  auto max_queue_size_opt = max_queue_size > 0 ? 
std::optional<uint64_t>(max_queue_size) : std::nullopt;
+
+  int port;
+  context.getProperty(port_prop.getName(), port);
+
+  if (protocol == utils::net::Protocol::UDP) {
+    server_ = std::make_unique<utils::net::UdpServer>(max_queue_size_opt, 
port, logger_);
+  } else if (protocol == utils::net::Protocol::TCP) {
+    server_ = std::make_unique<utils::net::TcpServer>(max_queue_size_opt, 
port, logger_);
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid protocol");
+  }
+
+  server_thread_ = std::thread([this]() { server_->run(); });
+  logger_->log_debug("Started %s server on port %d with %s max queue size and 
%zu max batch size",
+                     protocol.toString(),
+                     port,
+                     max_queue_size_opt ? std::to_string(*max_queue_size_opt) 
: "no",

Review Comment:
   nitpicking, but I would use "unlimited" instead of "no"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to