szaszm commented on code in PR #1779:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1779#discussion_r1627807868


##########
extensions/standard-processors/modbus/FetchModbusTcp.cpp:
##########
@@ -0,0 +1,259 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchModbusTcp.h"
+
+#include <utils/net/ConnectionHandler.h>
+
+#include <asio/read.hpp>
+#include <range/v3/view/drop.hpp>
+
+#include "core/Resource.h"
+
+#include "core/ProcessSession.h"
+#include "modbus/Error.h"
+#include "modbus/ReadModbusFunctions.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::modbus {
+
+
+void FetchModbusTcp::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  const auto record_set_writer_name = context.getProperty(RecordSetWriter);
+  record_set_writer_ = 
std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or("")));
+  if (!record_set_writer_)
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};
+
+    // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context.getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing 
hostname"};
+  }
+  if (context.getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"};
+  }
+  if (const auto idle_connection_expiration = 
context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (const auto timeout = 
context.getProperty<core::TimePeriodValue>(Timeout); timeout && 
timeout->getMilliseconds() > 0ms)
+    timeout_duration_ = timeout->getMilliseconds();
+  else
+    timeout_duration_ = 15s;
+
+  if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  ssl_context_.reset();
+  if (const auto context_name = context.getProperty(SSLContextService); 
context_name && !IsNullOrEmpty(*context_name)) {
+    if (auto controller_service = context.getControllerService(*context_name)) 
{
+      if (const auto ssl_context_service = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name)))
 {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not 
an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: 
" + *context_name);
+    }
+  }
+
+  readDynamicPropertyKeys(context);
+}
+
+void FetchModbusTcp::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  const auto flow_file = getFlowFile(session);
+  if (!flow_file) {
+    logger_->log_error("No flowfile to work on");
+    return;
+  }
+
+  removeExpiredConnections();
+
+  auto hostname = context.getProperty(Hostname, 
flow_file.get()).value_or(std::string{});
+  auto port = context.getProperty(Port, 
flow_file.get()).value_or(std::string{});
+
+  if (hostname.empty() || port.empty()) {
+    logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", 
flow_file->getUUIDStr(),
+        hostname.empty() ? "(empty)" : hostname.c_str(),
+        port.empty() ? "(empty)" : port.c_str());
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  auto connection_id = utils::net::ConnectionId(std::move(hostname), 
std::move(port));
+  std::shared_ptr<utils::net::ConnectionHandlerBase> handler;
+  if (!connections_ || !connections_->contains(connection_id)) {
+    if (ssl_context_)
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
+    else
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
+    if (connections_)
+      (*connections_)[connection_id] = handler;

Review Comment:
   ```suggestion
       if (ssl_context_) {
         handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
       } else {
         handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
       }
       if (connections_) {
         (*connections_)[connection_id] = handler;
       }
   ```



##########
extensions/standard-processors/modbus/FetchModbusTcp.cpp:
##########
@@ -0,0 +1,259 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchModbusTcp.h"
+
+#include <utils/net/ConnectionHandler.h>
+
+#include <asio/read.hpp>
+#include <range/v3/view/drop.hpp>
+
+#include "core/Resource.h"
+
+#include "core/ProcessSession.h"
+#include "modbus/Error.h"
+#include "modbus/ReadModbusFunctions.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::modbus {
+
+
+void FetchModbusTcp::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  const auto record_set_writer_name = context.getProperty(RecordSetWriter);
+  record_set_writer_ = 
std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or("")));
+  if (!record_set_writer_)
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};
+
+    // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context.getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing 
hostname"};
+  }
+  if (context.getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"};
+  }
+  if (const auto idle_connection_expiration = 
context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();

Review Comment:
   ```suggestion
     if (const auto idle_connection_expiration = 
context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms) {
       idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
     } else {
       idle_connection_expiration_.reset();
     }
   ```



##########
extensions/standard-processors/modbus/FetchModbusTcp.cpp:
##########
@@ -0,0 +1,259 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchModbusTcp.h"
+
+#include <utils/net/ConnectionHandler.h>
+
+#include <asio/read.hpp>
+#include <range/v3/view/drop.hpp>
+
+#include "core/Resource.h"
+
+#include "core/ProcessSession.h"
+#include "modbus/Error.h"
+#include "modbus/ReadModbusFunctions.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::modbus {
+
+
+void FetchModbusTcp::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  const auto record_set_writer_name = context.getProperty(RecordSetWriter);
+  record_set_writer_ = 
std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or("")));
+  if (!record_set_writer_)
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};
+
+    // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context.getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing 
hostname"};
+  }
+  if (context.getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"};
+  }
+  if (const auto idle_connection_expiration = 
context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (const auto timeout = 
context.getProperty<core::TimePeriodValue>(Timeout); timeout && 
timeout->getMilliseconds() > 0ms)
+    timeout_duration_ = timeout->getMilliseconds();
+  else
+    timeout_duration_ = 15s;

Review Comment:
   ```suggestion
     if (const auto timeout = 
context.getProperty<core::TimePeriodValue>(Timeout); timeout && 
timeout->getMilliseconds() > 0ms) {
       timeout_duration_ = timeout->getMilliseconds();
     } else {
       timeout_duration_ = 15s;
     }
   ```



##########
extensions/standard-processors/modbus/FetchModbusTcp.cpp:
##########
@@ -0,0 +1,259 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchModbusTcp.h"
+
+#include <utils/net/ConnectionHandler.h>
+
+#include <asio/read.hpp>
+#include <range/v3/view/drop.hpp>
+
+#include "core/Resource.h"
+
+#include "core/ProcessSession.h"
+#include "modbus/Error.h"
+#include "modbus/ReadModbusFunctions.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::modbus {
+
+
+void FetchModbusTcp::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  const auto record_set_writer_name = context.getProperty(RecordSetWriter);
+  record_set_writer_ = 
std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or("")));
+  if (!record_set_writer_)
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};
+
+    // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context.getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing 
hostname"};
+  }
+  if (context.getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"};
+  }
+  if (const auto idle_connection_expiration = 
context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (const auto timeout = 
context.getProperty<core::TimePeriodValue>(Timeout); timeout && 
timeout->getMilliseconds() > 0ms)
+    timeout_duration_ = timeout->getMilliseconds();
+  else
+    timeout_duration_ = 15s;
+
+  if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  ssl_context_.reset();
+  if (const auto context_name = context.getProperty(SSLContextService); 
context_name && !IsNullOrEmpty(*context_name)) {
+    if (auto controller_service = context.getControllerService(*context_name)) 
{
+      if (const auto ssl_context_service = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name)))
 {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not 
an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: 
" + *context_name);
+    }
+  }
+
+  readDynamicPropertyKeys(context);
+}
+
+void FetchModbusTcp::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  const auto flow_file = getFlowFile(session);
+  if (!flow_file) {
+    logger_->log_error("No flowfile to work on");
+    return;
+  }
+
+  removeExpiredConnections();
+
+  auto hostname = context.getProperty(Hostname, 
flow_file.get()).value_or(std::string{});
+  auto port = context.getProperty(Port, 
flow_file.get()).value_or(std::string{});
+
+  if (hostname.empty() || port.empty()) {
+    logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", 
flow_file->getUUIDStr(),
+        hostname.empty() ? "(empty)" : hostname.c_str(),
+        port.empty() ? "(empty)" : port.c_str());
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  auto connection_id = utils::net::ConnectionId(std::move(hostname), 
std::move(port));
+  std::shared_ptr<utils::net::ConnectionHandlerBase> handler;
+  if (!connections_ || !connections_->contains(connection_id)) {
+    if (ssl_context_)
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
+    else
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
+    if (connections_)
+      (*connections_)[connection_id] = handler;
+  } else {
+    handler = (*connections_)[connection_id];
+  }
+
+  gsl_Expects(handler);
+
+  processFlowFile(handler, context, session, flow_file);
+}
+
+void FetchModbusTcp::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& 
context) {
+  dynamic_property_keys_.clear();
+  const std::vector<std::string> dynamic_prop_keys = 
context.getDynamicPropertyKeys();
+  for (const auto& key : dynamic_prop_keys) {
+    
dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto
 generated").supportsExpressionLanguage(true).build());
+  }
+}
+
+std::shared_ptr<core::FlowFile> 
FetchModbusTcp::getFlowFile(core::ProcessSession& session) const {
+  if (hasIncomingConnections()) {
+    return session.get();
+  }
+  return session.create();
+}
+
+std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
FetchModbusTcp::getAddressMap(core::ProcessContext& context, const 
core::FlowFile& flow_file) {
+  std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
address_map{};
+  const auto unit_id_str = context.getProperty(UnitIdentifier, 
&flow_file).value_or("0");
+  const uint8_t unit_id = 
utils::string::parse<uint8_t>(unit_id_str).value_or(1);
+  for (const auto& dynamic_property : dynamic_property_keys_) {
+    if (std::string dynamic_property_value{}; 
context.getDynamicProperty(dynamic_property, dynamic_property_value, 
&flow_file)) {
+      if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, 
unit_id, dynamic_property_value); modbus_func)
+        address_map.emplace(dynamic_property.getName(), 
std::move(modbus_func));

Review Comment:
   ```suggestion
         if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, 
unit_id, dynamic_property_value); modbus_func) {
           address_map.emplace(dynamic_property.getName(), 
std::move(modbus_func));
         }
   ```



##########
extensions/standard-processors/modbus/FetchModbusTcp.cpp:
##########
@@ -0,0 +1,259 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchModbusTcp.h"
+
+#include <utils/net/ConnectionHandler.h>
+
+#include <asio/read.hpp>
+#include <range/v3/view/drop.hpp>
+
+#include "core/Resource.h"
+
+#include "core/ProcessSession.h"
+#include "modbus/Error.h"
+#include "modbus/ReadModbusFunctions.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::modbus {
+
+
+void FetchModbusTcp::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  const auto record_set_writer_name = context.getProperty(RecordSetWriter);
+  record_set_writer_ = 
std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or("")));
+  if (!record_set_writer_)
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};
+
+    // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context.getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing 
hostname"};
+  }
+  if (context.getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"};
+  }
+  if (const auto idle_connection_expiration = 
context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (const auto timeout = 
context.getProperty<core::TimePeriodValue>(Timeout); timeout && 
timeout->getMilliseconds() > 0ms)
+    timeout_duration_ = timeout->getMilliseconds();
+  else
+    timeout_duration_ = 15s;
+
+  if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  ssl_context_.reset();
+  if (const auto context_name = context.getProperty(SSLContextService); 
context_name && !IsNullOrEmpty(*context_name)) {
+    if (auto controller_service = context.getControllerService(*context_name)) 
{
+      if (const auto ssl_context_service = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name)))
 {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not 
an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: 
" + *context_name);
+    }
+  }
+
+  readDynamicPropertyKeys(context);
+}
+
+void FetchModbusTcp::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  const auto flow_file = getFlowFile(session);
+  if (!flow_file) {
+    logger_->log_error("No flowfile to work on");
+    return;
+  }
+
+  removeExpiredConnections();
+
+  auto hostname = context.getProperty(Hostname, 
flow_file.get()).value_or(std::string{});
+  auto port = context.getProperty(Port, 
flow_file.get()).value_or(std::string{});
+
+  if (hostname.empty() || port.empty()) {
+    logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", 
flow_file->getUUIDStr(),
+        hostname.empty() ? "(empty)" : hostname.c_str(),
+        port.empty() ? "(empty)" : port.c_str());
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  auto connection_id = utils::net::ConnectionId(std::move(hostname), 
std::move(port));
+  std::shared_ptr<utils::net::ConnectionHandlerBase> handler;
+  if (!connections_ || !connections_->contains(connection_id)) {
+    if (ssl_context_)
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
+    else
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
+    if (connections_)
+      (*connections_)[connection_id] = handler;
+  } else {
+    handler = (*connections_)[connection_id];
+  }
+
+  gsl_Expects(handler);
+
+  processFlowFile(handler, context, session, flow_file);
+}
+
+void FetchModbusTcp::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& 
context) {
+  dynamic_property_keys_.clear();
+  const std::vector<std::string> dynamic_prop_keys = 
context.getDynamicPropertyKeys();
+  for (const auto& key : dynamic_prop_keys) {
+    
dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto
 generated").supportsExpressionLanguage(true).build());
+  }
+}
+
+std::shared_ptr<core::FlowFile> 
FetchModbusTcp::getFlowFile(core::ProcessSession& session) const {
+  if (hasIncomingConnections()) {
+    return session.get();
+  }
+  return session.create();
+}
+
+std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
FetchModbusTcp::getAddressMap(core::ProcessContext& context, const 
core::FlowFile& flow_file) {
+  std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
address_map{};
+  const auto unit_id_str = context.getProperty(UnitIdentifier, 
&flow_file).value_or("0");
+  const uint8_t unit_id = 
utils::string::parse<uint8_t>(unit_id_str).value_or(1);
+  for (const auto& dynamic_property : dynamic_property_keys_) {
+    if (std::string dynamic_property_value{}; 
context.getDynamicProperty(dynamic_property, dynamic_property_value, 
&flow_file)) {
+      if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, 
unit_id, dynamic_property_value); modbus_func)
+        address_map.emplace(dynamic_property.getName(), 
std::move(modbus_func));
+    }
+  }
+  return address_map;
+}
+
+void FetchModbusTcp::removeExpiredConnections() {
+  if (connections_) {
+    std::erase_if(*connections_, [this](auto& item) -> bool {
+      const auto& connection_handler = item.second;
+      return (!connection_handler || (idle_connection_expiration_ && 
!connection_handler->hasBeenUsedIn(*idle_connection_expiration_)));
+    });
+  }
+}
+
+void FetchModbusTcp::processFlowFile(const 
std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
+    core::ProcessContext& context,
+    core::ProcessSession& session,
+    const std::shared_ptr<core::FlowFile>& flow_file) {
+  std::unordered_map<std::string, std::string> result_map{};
+  const auto address_map = getAddressMap(context, *flow_file);
+  if (address_map.empty()) {
+    logger_->log_warn("There are no registers to query");
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  if (auto result = readModbus(connection_handler, address_map); !result) {
+    connection_handler->reset();
+    logger_->log_error("{}", result.error().message());
+    session.transfer(flow_file, Failure);
+  } else {
+    core::RecordSet record_set;
+    record_set.push_back(std::move(*result));
+    record_set_writer_->write(record_set, flow_file, session);
+    session.transfer(flow_file, Success);
+  }
+}
+
+nonstd::expected<core::Record, std::error_code> FetchModbusTcp::readModbus(
+    const std::shared_ptr<utils::net::ConnectionHandlerBase>& 
connection_handler,
+    const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) {
+  nonstd::expected<core::Record, std::error_code> result;
+  io_context_.restart();
+  asio::co_spawn(io_context_,
+    sendRequestsAndReadResponses(*connection_handler, address_map),
+    [&result](const std::exception_ptr& exception_ptr, auto res) {
+      if (exception_ptr) {
+        result = nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse);
+      } else {
+        result = std::move(res);
+      }
+    });
+  io_context_.run();
+  return result;
+}
+
+auto 
FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& 
connection_handler,
+    const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) -> 
asio::awaitable<nonstd::expected<core::Record, std::error_code>> {
+  core::Record result;
+  for (const auto& [variable, read_modbus_fn] : address_map) {
+    auto response = co_await sendRequestAndReadResponse(connection_handler, 
*read_modbus_fn);
+    if (!response) {
+      co_return nonstd::make_unexpected(response.error());
+    }
+    result.emplace(variable, std::move(*response));
+  }
+  co_return result;
+}
+
+
+auto 
FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& 
connection_handler,
+    const ReadModbusFunction& read_modbus_function) -> 
asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> {
+  std::string result;
+  if (auto connection_error = co_await 
connection_handler.setupUsableSocket(io_context_))  // NOLINT
+    co_return nonstd::make_unexpected(connection_error);
+
+  if (auto [write_error, bytes_written] = co_await 
connection_handler.write(asio::buffer(read_modbus_function.requestBytes())); 
write_error)
+    co_return nonstd::make_unexpected(write_error);

Review Comment:
   ```suggestion
     if (auto [write_error, bytes_written] = co_await 
connection_handler.write(asio::buffer(read_modbus_function.requestBytes())); 
write_error) {
       co_return nonstd::make_unexpected(write_error);
     }
   ```



##########
extensions/standard-processors/modbus/FetchModbusTcp.cpp:
##########
@@ -0,0 +1,259 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchModbusTcp.h"
+
+#include <utils/net/ConnectionHandler.h>
+
+#include <asio/read.hpp>
+#include <range/v3/view/drop.hpp>
+
+#include "core/Resource.h"
+
+#include "core/ProcessSession.h"
+#include "modbus/Error.h"
+#include "modbus/ReadModbusFunctions.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::modbus {
+
+
+void FetchModbusTcp::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  const auto record_set_writer_name = context.getProperty(RecordSetWriter);
+  record_set_writer_ = 
std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or("")));
+  if (!record_set_writer_)
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};
+
+    // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context.getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing 
hostname"};
+  }
+  if (context.getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"};
+  }
+  if (const auto idle_connection_expiration = 
context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (const auto timeout = 
context.getProperty<core::TimePeriodValue>(Timeout); timeout && 
timeout->getMilliseconds() > 0ms)
+    timeout_duration_ = timeout->getMilliseconds();
+  else
+    timeout_duration_ = 15s;
+
+  if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  ssl_context_.reset();
+  if (const auto context_name = context.getProperty(SSLContextService); 
context_name && !IsNullOrEmpty(*context_name)) {
+    if (auto controller_service = context.getControllerService(*context_name)) 
{
+      if (const auto ssl_context_service = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name)))
 {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not 
an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: 
" + *context_name);
+    }
+  }
+
+  readDynamicPropertyKeys(context);
+}
+
+void FetchModbusTcp::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  const auto flow_file = getFlowFile(session);
+  if (!flow_file) {
+    logger_->log_error("No flowfile to work on");
+    return;
+  }
+
+  removeExpiredConnections();
+
+  auto hostname = context.getProperty(Hostname, 
flow_file.get()).value_or(std::string{});
+  auto port = context.getProperty(Port, 
flow_file.get()).value_or(std::string{});
+
+  if (hostname.empty() || port.empty()) {
+    logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", 
flow_file->getUUIDStr(),
+        hostname.empty() ? "(empty)" : hostname.c_str(),
+        port.empty() ? "(empty)" : port.c_str());
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  auto connection_id = utils::net::ConnectionId(std::move(hostname), 
std::move(port));
+  std::shared_ptr<utils::net::ConnectionHandlerBase> handler;
+  if (!connections_ || !connections_->contains(connection_id)) {
+    if (ssl_context_)
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
+    else
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
+    if (connections_)
+      (*connections_)[connection_id] = handler;
+  } else {
+    handler = (*connections_)[connection_id];
+  }
+
+  gsl_Expects(handler);
+
+  processFlowFile(handler, context, session, flow_file);
+}
+
+void FetchModbusTcp::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& 
context) {
+  dynamic_property_keys_.clear();
+  const std::vector<std::string> dynamic_prop_keys = 
context.getDynamicPropertyKeys();
+  for (const auto& key : dynamic_prop_keys) {
+    
dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto
 generated").supportsExpressionLanguage(true).build());
+  }
+}
+
+std::shared_ptr<core::FlowFile> 
FetchModbusTcp::getFlowFile(core::ProcessSession& session) const {
+  if (hasIncomingConnections()) {
+    return session.get();
+  }
+  return session.create();
+}
+
+std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
FetchModbusTcp::getAddressMap(core::ProcessContext& context, const 
core::FlowFile& flow_file) {
+  std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
address_map{};
+  const auto unit_id_str = context.getProperty(UnitIdentifier, 
&flow_file).value_or("0");
+  const uint8_t unit_id = 
utils::string::parse<uint8_t>(unit_id_str).value_or(1);
+  for (const auto& dynamic_property : dynamic_property_keys_) {
+    if (std::string dynamic_property_value{}; 
context.getDynamicProperty(dynamic_property, dynamic_property_value, 
&flow_file)) {
+      if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, 
unit_id, dynamic_property_value); modbus_func)
+        address_map.emplace(dynamic_property.getName(), 
std::move(modbus_func));
+    }
+  }
+  return address_map;
+}
+
+void FetchModbusTcp::removeExpiredConnections() {
+  if (connections_) {
+    std::erase_if(*connections_, [this](auto& item) -> bool {
+      const auto& connection_handler = item.second;
+      return (!connection_handler || (idle_connection_expiration_ && 
!connection_handler->hasBeenUsedIn(*idle_connection_expiration_)));
+    });
+  }
+}
+
+void FetchModbusTcp::processFlowFile(const 
std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
+    core::ProcessContext& context,
+    core::ProcessSession& session,
+    const std::shared_ptr<core::FlowFile>& flow_file) {
+  std::unordered_map<std::string, std::string> result_map{};
+  const auto address_map = getAddressMap(context, *flow_file);
+  if (address_map.empty()) {
+    logger_->log_warn("There are no registers to query");
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  if (auto result = readModbus(connection_handler, address_map); !result) {
+    connection_handler->reset();
+    logger_->log_error("{}", result.error().message());
+    session.transfer(flow_file, Failure);
+  } else {
+    core::RecordSet record_set;
+    record_set.push_back(std::move(*result));
+    record_set_writer_->write(record_set, flow_file, session);
+    session.transfer(flow_file, Success);
+  }
+}
+
+nonstd::expected<core::Record, std::error_code> FetchModbusTcp::readModbus(
+    const std::shared_ptr<utils::net::ConnectionHandlerBase>& 
connection_handler,
+    const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) {
+  nonstd::expected<core::Record, std::error_code> result;
+  io_context_.restart();
+  asio::co_spawn(io_context_,
+    sendRequestsAndReadResponses(*connection_handler, address_map),
+    [&result](const std::exception_ptr& exception_ptr, auto res) {
+      if (exception_ptr) {
+        result = nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse);
+      } else {
+        result = std::move(res);
+      }
+    });
+  io_context_.run();
+  return result;
+}
+
+auto 
FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& 
connection_handler,
+    const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) -> 
asio::awaitable<nonstd::expected<core::Record, std::error_code>> {
+  core::Record result;
+  for (const auto& [variable, read_modbus_fn] : address_map) {
+    auto response = co_await sendRequestAndReadResponse(connection_handler, 
*read_modbus_fn);
+    if (!response) {
+      co_return nonstd::make_unexpected(response.error());
+    }
+    result.emplace(variable, std::move(*response));
+  }
+  co_return result;
+}
+
+
+auto 
FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& 
connection_handler,
+    const ReadModbusFunction& read_modbus_function) -> 
asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> {
+  std::string result;
+  if (auto connection_error = co_await 
connection_handler.setupUsableSocket(io_context_))  // NOLINT
+    co_return nonstd::make_unexpected(connection_error);
+
+  if (auto [write_error, bytes_written] = co_await 
connection_handler.write(asio::buffer(read_modbus_function.requestBytes())); 
write_error)
+    co_return nonstd::make_unexpected(write_error);
+
+  std::array<std::byte, 7> apu_buffer{};
+  asio::mutable_buffer response_apu(apu_buffer.data(), 7);
+  if (auto [read_error, bytes_read] = co_await 
connection_handler.read(response_apu); read_error)
+    co_return nonstd::make_unexpected(read_error);
+
+  const auto received_transaction_id = fromBytes<uint16_t>({apu_buffer[0], 
apu_buffer[1]});
+  const auto received_protocol = fromBytes<uint16_t>({apu_buffer[2], 
apu_buffer[3]});
+  const auto received_length = fromBytes<uint16_t>({apu_buffer[4], 
apu_buffer[5]});
+  const auto unit_id = static_cast<uint8_t>(apu_buffer[6]);
+
+  if (received_transaction_id != read_modbus_function.getTransactionId())
+    co_return 
nonstd::make_unexpected(ModbusExceptionCode::InvalidTransactionId);
+  if (received_protocol != 0)
+    co_return nonstd::make_unexpected(ModbusExceptionCode::IllegalProtocol);
+  if (unit_id != read_modbus_function.getUnitId())
+    co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidSlaveId);
+  if (received_length + 6 > 260 || received_length <= 1)
+    co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse);

Review Comment:
   ```suggestion
     if (received_transaction_id != read_modbus_function.getTransactionId()) {
       co_return 
nonstd::make_unexpected(ModbusExceptionCode::InvalidTransactionId);
     }
     if (received_protocol != 0) {
       co_return nonstd::make_unexpected(ModbusExceptionCode::IllegalProtocol);
     }
     if (unit_id != read_modbus_function.getUnitId()) {
       co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidSlaveId);
     }
     if (received_length + 6 > 260 || received_length <= 1) {
       co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse);
     }
   ```



##########
extensions/standard-processors/modbus/FetchModbusTcp.cpp:
##########
@@ -0,0 +1,259 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchModbusTcp.h"
+
+#include <utils/net/ConnectionHandler.h>
+
+#include <asio/read.hpp>
+#include <range/v3/view/drop.hpp>
+
+#include "core/Resource.h"
+
+#include "core/ProcessSession.h"
+#include "modbus/Error.h"
+#include "modbus/ReadModbusFunctions.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::modbus {
+
+
+void FetchModbusTcp::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  const auto record_set_writer_name = context.getProperty(RecordSetWriter);
+  record_set_writer_ = 
std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or("")));
+  if (!record_set_writer_)
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};
+
+    // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context.getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing 
hostname"};
+  }
+  if (context.getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"};
+  }
+  if (const auto idle_connection_expiration = 
context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (const auto timeout = 
context.getProperty<core::TimePeriodValue>(Timeout); timeout && 
timeout->getMilliseconds() > 0ms)
+    timeout_duration_ = timeout->getMilliseconds();
+  else
+    timeout_duration_ = 15s;
+
+  if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  ssl_context_.reset();
+  if (const auto context_name = context.getProperty(SSLContextService); 
context_name && !IsNullOrEmpty(*context_name)) {
+    if (auto controller_service = context.getControllerService(*context_name)) 
{
+      if (const auto ssl_context_service = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name)))
 {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not 
an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: 
" + *context_name);
+    }
+  }
+
+  readDynamicPropertyKeys(context);
+}
+
+void FetchModbusTcp::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  const auto flow_file = getFlowFile(session);
+  if (!flow_file) {
+    logger_->log_error("No flowfile to work on");
+    return;
+  }
+
+  removeExpiredConnections();
+
+  auto hostname = context.getProperty(Hostname, 
flow_file.get()).value_or(std::string{});
+  auto port = context.getProperty(Port, 
flow_file.get()).value_or(std::string{});
+
+  if (hostname.empty() || port.empty()) {
+    logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", 
flow_file->getUUIDStr(),
+        hostname.empty() ? "(empty)" : hostname.c_str(),
+        port.empty() ? "(empty)" : port.c_str());
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  auto connection_id = utils::net::ConnectionId(std::move(hostname), 
std::move(port));
+  std::shared_ptr<utils::net::ConnectionHandlerBase> handler;
+  if (!connections_ || !connections_->contains(connection_id)) {
+    if (ssl_context_)
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
+    else
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
+    if (connections_)
+      (*connections_)[connection_id] = handler;
+  } else {
+    handler = (*connections_)[connection_id];
+  }
+
+  gsl_Expects(handler);
+
+  processFlowFile(handler, context, session, flow_file);
+}
+
+void FetchModbusTcp::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& 
context) {
+  dynamic_property_keys_.clear();
+  const std::vector<std::string> dynamic_prop_keys = 
context.getDynamicPropertyKeys();
+  for (const auto& key : dynamic_prop_keys) {
+    
dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto
 generated").supportsExpressionLanguage(true).build());
+  }
+}
+
+std::shared_ptr<core::FlowFile> 
FetchModbusTcp::getFlowFile(core::ProcessSession& session) const {
+  if (hasIncomingConnections()) {
+    return session.get();
+  }
+  return session.create();
+}
+
+std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
FetchModbusTcp::getAddressMap(core::ProcessContext& context, const 
core::FlowFile& flow_file) {
+  std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
address_map{};
+  const auto unit_id_str = context.getProperty(UnitIdentifier, 
&flow_file).value_or("0");
+  const uint8_t unit_id = 
utils::string::parse<uint8_t>(unit_id_str).value_or(1);
+  for (const auto& dynamic_property : dynamic_property_keys_) {
+    if (std::string dynamic_property_value{}; 
context.getDynamicProperty(dynamic_property, dynamic_property_value, 
&flow_file)) {
+      if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, 
unit_id, dynamic_property_value); modbus_func)
+        address_map.emplace(dynamic_property.getName(), 
std::move(modbus_func));
+    }
+  }
+  return address_map;
+}
+
+void FetchModbusTcp::removeExpiredConnections() {
+  if (connections_) {
+    std::erase_if(*connections_, [this](auto& item) -> bool {
+      const auto& connection_handler = item.second;
+      return (!connection_handler || (idle_connection_expiration_ && 
!connection_handler->hasBeenUsedIn(*idle_connection_expiration_)));
+    });
+  }
+}
+
+void FetchModbusTcp::processFlowFile(const 
std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
+    core::ProcessContext& context,
+    core::ProcessSession& session,
+    const std::shared_ptr<core::FlowFile>& flow_file) {
+  std::unordered_map<std::string, std::string> result_map{};
+  const auto address_map = getAddressMap(context, *flow_file);
+  if (address_map.empty()) {
+    logger_->log_warn("There are no registers to query");
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  if (auto result = readModbus(connection_handler, address_map); !result) {
+    connection_handler->reset();
+    logger_->log_error("{}", result.error().message());
+    session.transfer(flow_file, Failure);
+  } else {
+    core::RecordSet record_set;
+    record_set.push_back(std::move(*result));
+    record_set_writer_->write(record_set, flow_file, session);
+    session.transfer(flow_file, Success);
+  }
+}
+
+nonstd::expected<core::Record, std::error_code> FetchModbusTcp::readModbus(
+    const std::shared_ptr<utils::net::ConnectionHandlerBase>& 
connection_handler,
+    const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) {
+  nonstd::expected<core::Record, std::error_code> result;
+  io_context_.restart();
+  asio::co_spawn(io_context_,
+    sendRequestsAndReadResponses(*connection_handler, address_map),
+    [&result](const std::exception_ptr& exception_ptr, auto res) {
+      if (exception_ptr) {
+        result = nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse);
+      } else {
+        result = std::move(res);
+      }
+    });
+  io_context_.run();
+  return result;
+}
+
+auto 
FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& 
connection_handler,
+    const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) -> 
asio::awaitable<nonstd::expected<core::Record, std::error_code>> {
+  core::Record result;
+  for (const auto& [variable, read_modbus_fn] : address_map) {
+    auto response = co_await sendRequestAndReadResponse(connection_handler, 
*read_modbus_fn);
+    if (!response) {
+      co_return nonstd::make_unexpected(response.error());
+    }
+    result.emplace(variable, std::move(*response));
+  }
+  co_return result;
+}
+
+
+auto 
FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& 
connection_handler,
+    const ReadModbusFunction& read_modbus_function) -> 
asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> {
+  std::string result;
+  if (auto connection_error = co_await 
connection_handler.setupUsableSocket(io_context_))  // NOLINT
+    co_return nonstd::make_unexpected(connection_error);
+
+  if (auto [write_error, bytes_written] = co_await 
connection_handler.write(asio::buffer(read_modbus_function.requestBytes())); 
write_error)
+    co_return nonstd::make_unexpected(write_error);
+
+  std::array<std::byte, 7> apu_buffer{};
+  asio::mutable_buffer response_apu(apu_buffer.data(), 7);
+  if (auto [read_error, bytes_read] = co_await 
connection_handler.read(response_apu); read_error)
+    co_return nonstd::make_unexpected(read_error);

Review Comment:
   ```suggestion
     if (auto [read_error, bytes_read] = co_await 
connection_handler.read(response_apu); read_error) {
       co_return nonstd::make_unexpected(read_error);
     }
   ```



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -315,23 +141,48 @@ void PutTCP::removeExpiredConnections() {
   }
 }
 
-std::error_code 
PutTCP::sendFlowFileContent(std::shared_ptr<ConnectionHandlerBase>& 
connection_handler,
-    const std::shared_ptr<io::InputStream>& flow_file_content_stream) {
+std::error_code PutTCP::sendFlowFileContent(const 
std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
+                                            const 
std::shared_ptr<io::InputStream>& flow_file_content_stream) {

Review Comment:
   ```suggestion
   std::error_code PutTCP::sendFlowFileContent(const 
std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
       const std::shared_ptr<io::InputStream>& flow_file_content_stream) {
   ```



##########
extensions/standard-processors/modbus/FetchModbusTcp.cpp:
##########
@@ -0,0 +1,259 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchModbusTcp.h"
+
+#include <utils/net/ConnectionHandler.h>
+
+#include <asio/read.hpp>
+#include <range/v3/view/drop.hpp>
+
+#include "core/Resource.h"
+
+#include "core/ProcessSession.h"
+#include "modbus/Error.h"
+#include "modbus/ReadModbusFunctions.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::modbus {
+
+
+void FetchModbusTcp::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  const auto record_set_writer_name = context.getProperty(RecordSetWriter);
+  record_set_writer_ = 
std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or("")));
+  if (!record_set_writer_)
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};
+
+    // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context.getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing 
hostname"};
+  }
+  if (context.getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"};
+  }
+  if (const auto idle_connection_expiration = 
context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (const auto timeout = 
context.getProperty<core::TimePeriodValue>(Timeout); timeout && 
timeout->getMilliseconds() > 0ms)
+    timeout_duration_ = timeout->getMilliseconds();
+  else
+    timeout_duration_ = 15s;
+
+  if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();

Review Comment:
   ```suggestion
     if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false)) {
       connections_.reset();
     } else {
       connections_.emplace();
     }
   ```



##########
extensions/standard-processors/modbus/FetchModbusTcp.cpp:
##########
@@ -0,0 +1,259 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchModbusTcp.h"
+
+#include <utils/net/ConnectionHandler.h>
+
+#include <asio/read.hpp>
+#include <range/v3/view/drop.hpp>
+
+#include "core/Resource.h"
+
+#include "core/ProcessSession.h"
+#include "modbus/Error.h"
+#include "modbus/ReadModbusFunctions.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::modbus {
+
+
+void FetchModbusTcp::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  const auto record_set_writer_name = context.getProperty(RecordSetWriter);
+  record_set_writer_ = 
std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or("")));
+  if (!record_set_writer_)
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};

Review Comment:
   The style guide recommends braces around all if and for blocks: 
https://google.github.io/styleguide/cppguide.html#Formatting_Looping_Branching
   
   If you want to change this in the context of this project, we can discuss 
this with the wider contributor community, but until then, we should stick with 
it.
   
   ```suggestion
     if (!record_set_writer_) {
       throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};
     }
   ```



##########
extensions/standard-processors/modbus/Error.h:
##########
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <system_error>
+#include "magic_enum.hpp"
+
+namespace org::apache::nifi::minifi::modbus {
+
+enum class ModbusExceptionCode : std::underlying_type_t<std::byte> {
+  IllegalFunction = 0x01,
+  IllegalDataAddress = 0x02,
+  IllegalDataValue = 0x03,
+  SlaveDeviceFailure = 0x04,
+  Acknowledge = 0x05,
+  SlaveDeviceBusy = 0x06,
+  NegativeAcknowledge = 0x07,
+  MemoryParityError = 0x08,
+  GatewayPathUnavailable = 0x0a,
+  GatewayTargetDeviceFailedToRespond = 0x0b,
+  InvalidResponse,
+  MessageTooLarge,
+  InvalidTransactionId,
+  IllegalProtocol,
+  InvalidSlaveId
+};

Review Comment:
   Are these defined somewhere? A specification reference or similar in a 
comment would be nice.



##########
extensions/standard-processors/modbus/FetchModbusTcp.h:
##########
@@ -0,0 +1,143 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "controllers/SSLContextService.h"
+#include "controllers/RecordSetWriter.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "logging/LoggerFactory.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+#include "utils/net/ConnectionHandlerBase.h"
+
+namespace org::apache::nifi::minifi::modbus {
+
+class ReadModbusFunction;
+
+class FetchModbusTcp final : public core::Processor {
+ public:
+  explicit FetchModbusTcp(const std::string_view name, const 
utils::Identifier& uuid = {})
+      : Processor(name, uuid) {
+  }
+
+  EXTENSIONAPI static constexpr auto Description = "Processor able to read 
data from industrial PLCs using Modbus TCP/IP";
+
+  EXTENSIONAPI static constexpr auto Hostname = 
core::PropertyDefinitionBuilder<>::createProperty("Hostname")
+      .withDescription("The ip address or hostname of the destination.")
+      .isRequired(true)
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto Port = 
core::PropertyDefinitionBuilder<>::createProperty("Port")
+      .withDescription("The port or service on the destination.")
+      .withDefaultValue("502")
+      .isRequired(true)
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto UnitIdentifier = 
core::PropertyDefinitionBuilder<>::createProperty("Unit identifier")
+      .withDescription("The port or service on the destination.")
+      .isRequired(true)
+      .withDefaultValue("0")
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto IdleConnectionExpiration = 
core::PropertyDefinitionBuilder<>::createProperty("Idle Connection Expiration")
+      .withDescription("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
+      .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+      .withDefaultValue("15 seconds")
+      .isRequired(true)
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto ConnectionPerFlowFile = 
core::PropertyDefinitionBuilder<>::createProperty("Connection Per FlowFile")
+    .withDescription("Specifies whether to send each FlowFile's content on an 
individual connection.")
+    .withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE)
+    .withDefaultValue("false")
+    .isRequired(true)
+    .supportsExpressionLanguage(false)
+    .build();
+  EXTENSIONAPI static constexpr auto Timeout = 
core::PropertyDefinitionBuilder<>::createProperty("Timeout")
+      .withDescription("The timeout for connecting to and communicating with 
the destination.")
+      .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+      .withDefaultValue("15 seconds")
+      .isRequired(true)
+      .supportsExpressionLanguage(true)
+      .build();
+  EXTENSIONAPI static constexpr auto SSLContextService = 
core::PropertyDefinitionBuilder<>::createProperty("SSL Context Service")
+      .withDescription("The Controller Service to use in order to obtain an 
SSL Context. If this property is set, messages will be sent over a secure 
connection.")
+      .isRequired(false)
+      .withAllowedTypes<minifi::controllers::SSLContextService>()
+      .build();
+  EXTENSIONAPI static constexpr auto RecordSetWriter = 
core::PropertyDefinitionBuilder<>::createProperty("Record Set Writer")
+    .withDescription("Specifies the Controller Service to use for writing 
results to a FlowFile. ")
+    .isRequired(false)
+    .withAllowedTypes<core::RecordSetWriter>()
+    .build();
+
+  EXTENSIONAPI static constexpr auto Properties = 
std::array<core::PropertyReference, 8>{
+    Hostname,
+    Port,
+    UnitIdentifier,
+    IdleConnectionExpiration,
+    ConnectionPerFlowFile,
+    Timeout,
+    SSLContextService,
+    RecordSetWriter
+  };
+
+  EXTENSIONAPI static constexpr auto Success = 
core::RelationshipDefinition{"success", "Successfully processed"};
+  EXTENSIONAPI static constexpr auto Failure = 
core::RelationshipDefinition{"failure", "An error occurred processing"};
+  EXTENSIONAPI static constexpr auto Relationships = std::array{Success, 
Failure};
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
+  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+  EXTENSIONAPI static constexpr auto InputRequirement = 
core::annotation::Input::INPUT_ALLOWED;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& 
session_factory) override;
+  void onTrigger(core::ProcessContext& context, core::ProcessSession& session) 
override;
+  void initialize() override;
+
+ private:
+  void readDynamicPropertyKeys(const core::ProcessContext& context);
+  void processFlowFile(const 
std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
+    core::ProcessContext& context,
+    core::ProcessSession& session,
+    const std::shared_ptr<core::FlowFile>& flow_file);
+
+  auto readModbus(const std::shared_ptr<utils::net::ConnectionHandlerBase>& 
connection_handler,
+      const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) -> 
nonstd::expected<core::Record, std::error_code>;
+  auto sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& 
connection_handler,
+      const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) -> 
asio::awaitable<nonstd::expected<core::Record, std::error_code>>;
+  auto sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& 
connection_handler,
+      const ReadModbusFunction& read_modbus_function) -> 
asio::awaitable<nonstd::expected<core::RecordField, std::error_code>>;

Review Comment:
   Why trailing return types? In these cases, they don't depend on parameter 
identifiers, so I'd keep them in front for consistency.
   ```suggestion
     nonstd::expected<core::Record, std::error_code> readModbus(const 
std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
         const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map);
     asio::awaitable<nonstd::expected<core::Record, std::error_code>> 
sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& 
connection_handler,
         const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map);
     asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> 
sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& 
connection_handler,
         const ReadModbusFunction& read_modbus_function);
   ```



##########
extensions/standard-processors/modbus/FetchModbusTcp.cpp:
##########
@@ -0,0 +1,259 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchModbusTcp.h"
+
+#include <utils/net/ConnectionHandler.h>
+
+#include <asio/read.hpp>
+#include <range/v3/view/drop.hpp>
+
+#include "core/Resource.h"
+
+#include "core/ProcessSession.h"
+#include "modbus/Error.h"
+#include "modbus/ReadModbusFunctions.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::modbus {
+
+
+void FetchModbusTcp::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  const auto record_set_writer_name = context.getProperty(RecordSetWriter);
+  record_set_writer_ = 
std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or("")));
+  if (!record_set_writer_)
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};
+
+    // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context.getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing 
hostname"};
+  }
+  if (context.getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"};
+  }
+  if (const auto idle_connection_expiration = 
context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (const auto timeout = 
context.getProperty<core::TimePeriodValue>(Timeout); timeout && 
timeout->getMilliseconds() > 0ms)
+    timeout_duration_ = timeout->getMilliseconds();
+  else
+    timeout_duration_ = 15s;
+
+  if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  ssl_context_.reset();
+  if (const auto context_name = context.getProperty(SSLContextService); 
context_name && !IsNullOrEmpty(*context_name)) {
+    if (auto controller_service = context.getControllerService(*context_name)) 
{
+      if (const auto ssl_context_service = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name)))
 {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not 
an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: 
" + *context_name);
+    }
+  }
+
+  readDynamicPropertyKeys(context);
+}
+
+void FetchModbusTcp::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  const auto flow_file = getFlowFile(session);
+  if (!flow_file) {
+    logger_->log_error("No flowfile to work on");
+    return;
+  }
+
+  removeExpiredConnections();
+
+  auto hostname = context.getProperty(Hostname, 
flow_file.get()).value_or(std::string{});
+  auto port = context.getProperty(Port, 
flow_file.get()).value_or(std::string{});
+
+  if (hostname.empty() || port.empty()) {
+    logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", 
flow_file->getUUIDStr(),
+        hostname.empty() ? "(empty)" : hostname.c_str(),
+        port.empty() ? "(empty)" : port.c_str());
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  auto connection_id = utils::net::ConnectionId(std::move(hostname), 
std::move(port));
+  std::shared_ptr<utils::net::ConnectionHandlerBase> handler;
+  if (!connections_ || !connections_->contains(connection_id)) {
+    if (ssl_context_)
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
+    else
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
+    if (connections_)
+      (*connections_)[connection_id] = handler;
+  } else {
+    handler = (*connections_)[connection_id];
+  }
+
+  gsl_Expects(handler);
+
+  processFlowFile(handler, context, session, flow_file);
+}
+
+void FetchModbusTcp::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& 
context) {
+  dynamic_property_keys_.clear();
+  const std::vector<std::string> dynamic_prop_keys = 
context.getDynamicPropertyKeys();
+  for (const auto& key : dynamic_prop_keys) {
+    
dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto
 generated").supportsExpressionLanguage(true).build());
+  }
+}
+
+std::shared_ptr<core::FlowFile> 
FetchModbusTcp::getFlowFile(core::ProcessSession& session) const {
+  if (hasIncomingConnections()) {
+    return session.get();
+  }
+  return session.create();
+}
+
+std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
FetchModbusTcp::getAddressMap(core::ProcessContext& context, const 
core::FlowFile& flow_file) {
+  std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
address_map{};
+  const auto unit_id_str = context.getProperty(UnitIdentifier, 
&flow_file).value_or("0");
+  const uint8_t unit_id = 
utils::string::parse<uint8_t>(unit_id_str).value_or(1);
+  for (const auto& dynamic_property : dynamic_property_keys_) {
+    if (std::string dynamic_property_value{}; 
context.getDynamicProperty(dynamic_property, dynamic_property_value, 
&flow_file)) {
+      if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, 
unit_id, dynamic_property_value); modbus_func)
+        address_map.emplace(dynamic_property.getName(), 
std::move(modbus_func));
+    }
+  }
+  return address_map;
+}
+
+void FetchModbusTcp::removeExpiredConnections() {
+  if (connections_) {
+    std::erase_if(*connections_, [this](auto& item) -> bool {
+      const auto& connection_handler = item.second;
+      return (!connection_handler || (idle_connection_expiration_ && 
!connection_handler->hasBeenUsedIn(*idle_connection_expiration_)));
+    });
+  }
+}
+
+void FetchModbusTcp::processFlowFile(const 
std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
+    core::ProcessContext& context,
+    core::ProcessSession& session,
+    const std::shared_ptr<core::FlowFile>& flow_file) {
+  std::unordered_map<std::string, std::string> result_map{};
+  const auto address_map = getAddressMap(context, *flow_file);
+  if (address_map.empty()) {
+    logger_->log_warn("There are no registers to query");
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  if (auto result = readModbus(connection_handler, address_map); !result) {
+    connection_handler->reset();
+    logger_->log_error("{}", result.error().message());
+    session.transfer(flow_file, Failure);
+  } else {
+    core::RecordSet record_set;
+    record_set.push_back(std::move(*result));
+    record_set_writer_->write(record_set, flow_file, session);
+    session.transfer(flow_file, Success);
+  }
+}
+
+nonstd::expected<core::Record, std::error_code> FetchModbusTcp::readModbus(
+    const std::shared_ptr<utils::net::ConnectionHandlerBase>& 
connection_handler,
+    const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) {
+  nonstd::expected<core::Record, std::error_code> result;
+  io_context_.restart();
+  asio::co_spawn(io_context_,
+    sendRequestsAndReadResponses(*connection_handler, address_map),
+    [&result](const std::exception_ptr& exception_ptr, auto res) {
+      if (exception_ptr) {
+        result = nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse);
+      } else {
+        result = std::move(res);
+      }
+    });
+  io_context_.run();
+  return result;
+}
+
+auto 
FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& 
connection_handler,
+    const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) -> 
asio::awaitable<nonstd::expected<core::Record, std::error_code>> {
+  core::Record result;
+  for (const auto& [variable, read_modbus_fn] : address_map) {
+    auto response = co_await sendRequestAndReadResponse(connection_handler, 
*read_modbus_fn);
+    if (!response) {
+      co_return nonstd::make_unexpected(response.error());
+    }
+    result.emplace(variable, std::move(*response));
+  }
+  co_return result;
+}
+
+
+auto 
FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& 
connection_handler,
+    const ReadModbusFunction& read_modbus_function) -> 
asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> {
+  std::string result;
+  if (auto connection_error = co_await 
connection_handler.setupUsableSocket(io_context_))  // NOLINT
+    co_return nonstd::make_unexpected(connection_error);

Review Comment:
   What's the linter issue on this line? Could you add details after the NOLINT 
comment?
   
   ```suggestion
     if (auto connection_error = co_await 
connection_handler.setupUsableSocket(io_context_)) {  // NOLINT
       co_return nonstd::make_unexpected(connection_error);
     }
   ```



##########
extensions/standard-processors/modbus/ByteConverters.h:
##########
@@ -0,0 +1,51 @@
+/**
+*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <array>
+#include <cstdint>
+#include <span>
+#include <string_view>
+#include <bit>
+
+namespace org::apache::nifi::minifi::modbus {
+
+template<typename T, std::endian to_endianness = std::endian::big>
+std::array<std::byte, std::max(sizeof(T), sizeof(uint16_t))> toBytes(T value) {

Review Comment:
   Can you make these helpers `constexpr`? I don't see why they couldn't be 
done at compile-time.



##########
libminifi/include/utils/StringUtils.h:
##########
@@ -432,6 +433,14 @@ struct ParseError {};
 nonstd::expected<std::optional<char>, ParseError> 
parseCharacter(std::string_view input);
 
 std::string replaceEscapedCharacters(std::string_view input);
+
+template<typename T>
+nonstd::expected<T, ParseError> parse(std::string_view input) {
+  T t{};
+  if (const auto result = std::from_chars(input.data(), input.data() + 
input.size(), t); result.ptr != input.data() + input.size())
+    return nonstd::make_unexpected(ParseError{});
+  return t;
+}

Review Comment:
   Better naming and docs would be appreciated. From what I can see, this only 
parses numbers.
   ```suggestion
   // no std::arithmetic yet
   template <typename T> concept arithmetic = integral<T> || floating_point<T>;
   
   template<arithmetic T>
   nonstd::expected<T, ParseError> parseNumber(std::string_view input) {
     T t{};
     if (const auto result = std::from_chars(input.data(), input.data() + 
input.size(), t); result.ptr != input.data() + input.size())
       return nonstd::make_unexpected(ParseError{});
     return t;
   }
   ```



##########
extensions/standard-processors/modbus/FetchModbusTcp.cpp:
##########
@@ -0,0 +1,259 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FetchModbusTcp.h"
+
+#include <utils/net/ConnectionHandler.h>
+
+#include <asio/read.hpp>
+#include <range/v3/view/drop.hpp>
+
+#include "core/Resource.h"
+
+#include "core/ProcessSession.h"
+#include "modbus/Error.h"
+#include "modbus/ReadModbusFunctions.h"
+#include "utils/net/AsioCoro.h"
+#include "utils/net/AsioSocketUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::modbus {
+
+
+void FetchModbusTcp::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  const auto record_set_writer_name = context.getProperty(RecordSetWriter);
+  record_set_writer_ = 
std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or("")));
+  if (!record_set_writer_)
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or 
missing RecordSetWriter"};
+
+    // if the required properties are missing or empty even before evaluating 
the EL expression, then we can throw in onSchedule, before we waste any flow 
files
+  if (context.getProperty(Hostname).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing 
hostname"};
+  }
+  if (context.getProperty(Port).value_or(std::string{}).empty()) {
+    throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"};
+  }
+  if (const auto idle_connection_expiration = 
context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); 
idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 
0ms)
+    idle_connection_expiration_ = 
idle_connection_expiration->getMilliseconds();
+  else
+    idle_connection_expiration_.reset();
+
+  if (const auto timeout = 
context.getProperty<core::TimePeriodValue>(Timeout); timeout && 
timeout->getMilliseconds() > 0ms)
+    timeout_duration_ = timeout->getMilliseconds();
+  else
+    timeout_duration_ = 15s;
+
+  if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false))
+    connections_.reset();
+  else
+    connections_.emplace();
+
+  ssl_context_.reset();
+  if (const auto context_name = context.getProperty(SSLContextService); 
context_name && !IsNullOrEmpty(*context_name)) {
+    if (auto controller_service = context.getControllerService(*context_name)) 
{
+      if (const auto ssl_context_service = 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name)))
 {
+        ssl_context_ = utils::net::getSslContext(*ssl_context_service);
+      } else {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not 
an SSL Context Service");
+      }
+    } else {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: 
" + *context_name);
+    }
+  }
+
+  readDynamicPropertyKeys(context);
+}
+
+void FetchModbusTcp::onTrigger(core::ProcessContext& context, 
core::ProcessSession& session) {
+  const auto flow_file = getFlowFile(session);
+  if (!flow_file) {
+    logger_->log_error("No flowfile to work on");
+    return;
+  }
+
+  removeExpiredConnections();
+
+  auto hostname = context.getProperty(Hostname, 
flow_file.get()).value_or(std::string{});
+  auto port = context.getProperty(Port, 
flow_file.get()).value_or(std::string{});
+
+  if (hostname.empty() || port.empty()) {
+    logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", 
flow_file->getUUIDStr(),
+        hostname.empty() ? "(empty)" : hostname.c_str(),
+        port.empty() ? "(empty)" : port.c_str());
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  auto connection_id = utils::net::ConnectionId(std::move(hostname), 
std::move(port));
+  std::shared_ptr<utils::net::ConnectionHandlerBase> handler;
+  if (!connections_ || !connections_->contains(connection_id)) {
+    if (ssl_context_)
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_);
+    else
+      handler = 
std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id,
 timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr);
+    if (connections_)
+      (*connections_)[connection_id] = handler;
+  } else {
+    handler = (*connections_)[connection_id];
+  }
+
+  gsl_Expects(handler);
+
+  processFlowFile(handler, context, session, flow_file);
+}
+
+void FetchModbusTcp::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& 
context) {
+  dynamic_property_keys_.clear();
+  const std::vector<std::string> dynamic_prop_keys = 
context.getDynamicPropertyKeys();
+  for (const auto& key : dynamic_prop_keys) {
+    
dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto
 generated").supportsExpressionLanguage(true).build());
+  }
+}
+
+std::shared_ptr<core::FlowFile> 
FetchModbusTcp::getFlowFile(core::ProcessSession& session) const {
+  if (hasIncomingConnections()) {
+    return session.get();
+  }
+  return session.create();
+}
+
+std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
FetchModbusTcp::getAddressMap(core::ProcessContext& context, const 
core::FlowFile& flow_file) {
+  std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> 
address_map{};
+  const auto unit_id_str = context.getProperty(UnitIdentifier, 
&flow_file).value_or("0");
+  const uint8_t unit_id = 
utils::string::parse<uint8_t>(unit_id_str).value_or(1);
+  for (const auto& dynamic_property : dynamic_property_keys_) {
+    if (std::string dynamic_property_value{}; 
context.getDynamicProperty(dynamic_property, dynamic_property_value, 
&flow_file)) {
+      if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, 
unit_id, dynamic_property_value); modbus_func)
+        address_map.emplace(dynamic_property.getName(), 
std::move(modbus_func));
+    }
+  }
+  return address_map;
+}
+
+void FetchModbusTcp::removeExpiredConnections() {
+  if (connections_) {
+    std::erase_if(*connections_, [this](auto& item) -> bool {
+      const auto& connection_handler = item.second;
+      return (!connection_handler || (idle_connection_expiration_ && 
!connection_handler->hasBeenUsedIn(*idle_connection_expiration_)));
+    });
+  }
+}
+
+void FetchModbusTcp::processFlowFile(const 
std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
+    core::ProcessContext& context,
+    core::ProcessSession& session,
+    const std::shared_ptr<core::FlowFile>& flow_file) {
+  std::unordered_map<std::string, std::string> result_map{};
+  const auto address_map = getAddressMap(context, *flow_file);
+  if (address_map.empty()) {
+    logger_->log_warn("There are no registers to query");
+    session.transfer(flow_file, Failure);
+    return;
+  }
+
+  if (auto result = readModbus(connection_handler, address_map); !result) {
+    connection_handler->reset();
+    logger_->log_error("{}", result.error().message());
+    session.transfer(flow_file, Failure);
+  } else {
+    core::RecordSet record_set;
+    record_set.push_back(std::move(*result));
+    record_set_writer_->write(record_set, flow_file, session);
+    session.transfer(flow_file, Success);
+  }
+}
+
+nonstd::expected<core::Record, std::error_code> FetchModbusTcp::readModbus(
+    const std::shared_ptr<utils::net::ConnectionHandlerBase>& 
connection_handler,
+    const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) {
+  nonstd::expected<core::Record, std::error_code> result;
+  io_context_.restart();
+  asio::co_spawn(io_context_,
+    sendRequestsAndReadResponses(*connection_handler, address_map),
+    [&result](const std::exception_ptr& exception_ptr, auto res) {
+      if (exception_ptr) {
+        result = nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse);
+      } else {
+        result = std::move(res);
+      }
+    });
+  io_context_.run();
+  return result;
+}
+
+auto 
FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& 
connection_handler,
+    const std::unordered_map<std::string, 
std::unique_ptr<ReadModbusFunction>>& address_map) -> 
asio::awaitable<nonstd::expected<core::Record, std::error_code>> {
+  core::Record result;
+  for (const auto& [variable, read_modbus_fn] : address_map) {
+    auto response = co_await sendRequestAndReadResponse(connection_handler, 
*read_modbus_fn);
+    if (!response) {
+      co_return nonstd::make_unexpected(response.error());
+    }
+    result.emplace(variable, std::move(*response));
+  }
+  co_return result;
+}
+
+
+auto 
FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& 
connection_handler,
+    const ReadModbusFunction& read_modbus_function) -> 
asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> {
+  std::string result;
+  if (auto connection_error = co_await 
connection_handler.setupUsableSocket(io_context_))  // NOLINT
+    co_return nonstd::make_unexpected(connection_error);
+
+  if (auto [write_error, bytes_written] = co_await 
connection_handler.write(asio::buffer(read_modbus_function.requestBytes())); 
write_error)
+    co_return nonstd::make_unexpected(write_error);
+
+  std::array<std::byte, 7> apu_buffer{};
+  asio::mutable_buffer response_apu(apu_buffer.data(), 7);
+  if (auto [read_error, bytes_read] = co_await 
connection_handler.read(response_apu); read_error)
+    co_return nonstd::make_unexpected(read_error);
+
+  const auto received_transaction_id = fromBytes<uint16_t>({apu_buffer[0], 
apu_buffer[1]});
+  const auto received_protocol = fromBytes<uint16_t>({apu_buffer[2], 
apu_buffer[3]});
+  const auto received_length = fromBytes<uint16_t>({apu_buffer[4], 
apu_buffer[5]});
+  const auto unit_id = static_cast<uint8_t>(apu_buffer[6]);
+
+  if (received_transaction_id != read_modbus_function.getTransactionId())
+    co_return 
nonstd::make_unexpected(ModbusExceptionCode::InvalidTransactionId);
+  if (received_protocol != 0)
+    co_return nonstd::make_unexpected(ModbusExceptionCode::IllegalProtocol);
+  if (unit_id != read_modbus_function.getUnitId())
+    co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidSlaveId);
+  if (received_length + 6 > 260 || received_length <= 1)
+    co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse);
+
+  std::array<std::byte, 260-7> pdu_buffer{};
+  asio::mutable_buffer response_pdu(pdu_buffer.data(), received_length-1);
+  auto [read_error, bytes_read] = co_await 
connection_handler.read(response_pdu);
+  if (read_error)
+    co_return nonstd::make_unexpected(read_error);

Review Comment:
   ```suggestion
     if (read_error) {
       co_return nonstd::make_unexpected(read_error);
     }
   ```



##########
extensions/standard-processors/processors/PutTCP.cpp:
##########
@@ -315,23 +141,48 @@ void PutTCP::removeExpiredConnections() {
   }
 }
 
-std::error_code 
PutTCP::sendFlowFileContent(std::shared_ptr<ConnectionHandlerBase>& 
connection_handler,
-    const std::shared_ptr<io::InputStream>& flow_file_content_stream) {
+std::error_code PutTCP::sendFlowFileContent(const 
std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler,
+                                            const 
std::shared_ptr<io::InputStream>& flow_file_content_stream) {
   std::error_code operation_error;
   io_context_.restart();
   asio::co_spawn(io_context_,
-      connection_handler->sendStreamWithDelimiter(flow_file_content_stream, 
delimiter_, io_context_),
-      [&operation_error](const std::exception_ptr&, std::error_code 
error_code) {
-        operation_error = error_code;
-      });
+    sendStreamWithDelimiter(*connection_handler, flow_file_content_stream, 
delimiter_),
+    [&operation_error](const std::exception_ptr&, const std::error_code 
error_code) {
+      operation_error = error_code;
+    });
   io_context_.run();
   return operation_error;
 }
 
-void PutTCP::processFlowFile(std::shared_ptr<ConnectionHandlerBase>& 
connection_handler,
+asio::awaitable<std::error_code> 
PutTCP::sendStreamWithDelimiter(utils::net::ConnectionHandlerBase& 
connection_handler,
+    const std::shared_ptr<io::InputStream>& stream_to_send, const 
std::vector<std::byte>& delimiter) {
+  if (auto connection_error = co_await 
connection_handler.setupUsableSocket(io_context_))  // NOLINT
+    co_return connection_error;

Review Comment:
   ```suggestion
     if (auto connection_error = co_await 
connection_handler.setupUsableSocket(io_context_)) {  // NOLINT
       co_return connection_error;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to