szaszm commented on code in PR #1779: URL: https://github.com/apache/nifi-minifi-cpp/pull/1779#discussion_r1627807868
########## extensions/standard-processors/modbus/FetchModbusTcp.cpp: ########## @@ -0,0 +1,259 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchModbusTcp.h" + +#include <utils/net/ConnectionHandler.h> + +#include <asio/read.hpp> +#include <range/v3/view/drop.hpp> + +#include "core/Resource.h" + +#include "core/ProcessSession.h" +#include "modbus/Error.h" +#include "modbus/ReadModbusFunctions.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::modbus { + + +void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + const auto record_set_writer_name = context.getProperty(RecordSetWriter); + record_set_writer_ = std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or(""))); + if (!record_set_writer_) + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; + + // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files + if (context.getProperty(Hostname).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing hostname"}; + } + if (context.getProperty(Port).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"}; + } + if (const auto idle_connection_expiration = context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) + idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); + else + idle_connection_expiration_.reset(); + + if (const auto timeout = context.getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms) + timeout_duration_ = timeout->getMilliseconds(); + else + timeout_duration_ = 15s; + + if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false)) + connections_.reset(); + else + connections_.emplace(); + + ssl_context_.reset(); + if (const auto context_name = context.getProperty(SSLContextService); context_name && !IsNullOrEmpty(*context_name)) { + if (auto controller_service = context.getControllerService(*context_name)) { + if (const auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name))) { + ssl_context_ = utils::net::getSslContext(*ssl_context_service); + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service"); + } + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name); + } + } + + readDynamicPropertyKeys(context); +} + +void FetchModbusTcp::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + const auto flow_file = getFlowFile(session); + if (!flow_file) { + logger_->log_error("No flowfile to work on"); + return; + } + + removeExpiredConnections(); + + auto hostname = context.getProperty(Hostname, flow_file.get()).value_or(std::string{}); + auto port = context.getProperty(Port, flow_file.get()).value_or(std::string{}); + + if (hostname.empty() || port.empty()) { + logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", flow_file->getUUIDStr(), + hostname.empty() ? "(empty)" : hostname.c_str(), + port.empty() ? "(empty)" : port.c_str()); + session.transfer(flow_file, Failure); + return; + } + + auto connection_id = utils::net::ConnectionId(std::move(hostname), std::move(port)); + std::shared_ptr<utils::net::ConnectionHandlerBase> handler; + if (!connections_ || !connections_->contains(connection_id)) { + if (ssl_context_) + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_); + else + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr); + if (connections_) + (*connections_)[connection_id] = handler; Review Comment: ```suggestion if (ssl_context_) { handler = std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_); } else { handler = std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr); } if (connections_) { (*connections_)[connection_id] = handler; } ``` ########## extensions/standard-processors/modbus/FetchModbusTcp.cpp: ########## @@ -0,0 +1,259 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchModbusTcp.h" + +#include <utils/net/ConnectionHandler.h> + +#include <asio/read.hpp> +#include <range/v3/view/drop.hpp> + +#include "core/Resource.h" + +#include "core/ProcessSession.h" +#include "modbus/Error.h" +#include "modbus/ReadModbusFunctions.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::modbus { + + +void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + const auto record_set_writer_name = context.getProperty(RecordSetWriter); + record_set_writer_ = std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or(""))); + if (!record_set_writer_) + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; + + // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files + if (context.getProperty(Hostname).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing hostname"}; + } + if (context.getProperty(Port).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"}; + } + if (const auto idle_connection_expiration = context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) + idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); + else + idle_connection_expiration_.reset(); Review Comment: ```suggestion if (const auto idle_connection_expiration = context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) { idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); } else { idle_connection_expiration_.reset(); } ``` ########## extensions/standard-processors/modbus/FetchModbusTcp.cpp: ########## @@ -0,0 +1,259 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchModbusTcp.h" + +#include <utils/net/ConnectionHandler.h> + +#include <asio/read.hpp> +#include <range/v3/view/drop.hpp> + +#include "core/Resource.h" + +#include "core/ProcessSession.h" +#include "modbus/Error.h" +#include "modbus/ReadModbusFunctions.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::modbus { + + +void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + const auto record_set_writer_name = context.getProperty(RecordSetWriter); + record_set_writer_ = std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or(""))); + if (!record_set_writer_) + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; + + // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files + if (context.getProperty(Hostname).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing hostname"}; + } + if (context.getProperty(Port).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"}; + } + if (const auto idle_connection_expiration = context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) + idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); + else + idle_connection_expiration_.reset(); + + if (const auto timeout = context.getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms) + timeout_duration_ = timeout->getMilliseconds(); + else + timeout_duration_ = 15s; Review Comment: ```suggestion if (const auto timeout = context.getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms) { timeout_duration_ = timeout->getMilliseconds(); } else { timeout_duration_ = 15s; } ``` ########## extensions/standard-processors/modbus/FetchModbusTcp.cpp: ########## @@ -0,0 +1,259 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchModbusTcp.h" + +#include <utils/net/ConnectionHandler.h> + +#include <asio/read.hpp> +#include <range/v3/view/drop.hpp> + +#include "core/Resource.h" + +#include "core/ProcessSession.h" +#include "modbus/Error.h" +#include "modbus/ReadModbusFunctions.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::modbus { + + +void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + const auto record_set_writer_name = context.getProperty(RecordSetWriter); + record_set_writer_ = std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or(""))); + if (!record_set_writer_) + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; + + // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files + if (context.getProperty(Hostname).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing hostname"}; + } + if (context.getProperty(Port).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"}; + } + if (const auto idle_connection_expiration = context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) + idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); + else + idle_connection_expiration_.reset(); + + if (const auto timeout = context.getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms) + timeout_duration_ = timeout->getMilliseconds(); + else + timeout_duration_ = 15s; + + if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false)) + connections_.reset(); + else + connections_.emplace(); + + ssl_context_.reset(); + if (const auto context_name = context.getProperty(SSLContextService); context_name && !IsNullOrEmpty(*context_name)) { + if (auto controller_service = context.getControllerService(*context_name)) { + if (const auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name))) { + ssl_context_ = utils::net::getSslContext(*ssl_context_service); + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service"); + } + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name); + } + } + + readDynamicPropertyKeys(context); +} + +void FetchModbusTcp::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + const auto flow_file = getFlowFile(session); + if (!flow_file) { + logger_->log_error("No flowfile to work on"); + return; + } + + removeExpiredConnections(); + + auto hostname = context.getProperty(Hostname, flow_file.get()).value_or(std::string{}); + auto port = context.getProperty(Port, flow_file.get()).value_or(std::string{}); + + if (hostname.empty() || port.empty()) { + logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", flow_file->getUUIDStr(), + hostname.empty() ? "(empty)" : hostname.c_str(), + port.empty() ? "(empty)" : port.c_str()); + session.transfer(flow_file, Failure); + return; + } + + auto connection_id = utils::net::ConnectionId(std::move(hostname), std::move(port)); + std::shared_ptr<utils::net::ConnectionHandlerBase> handler; + if (!connections_ || !connections_->contains(connection_id)) { + if (ssl_context_) + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_); + else + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr); + if (connections_) + (*connections_)[connection_id] = handler; + } else { + handler = (*connections_)[connection_id]; + } + + gsl_Expects(handler); + + processFlowFile(handler, context, session, flow_file); +} + +void FetchModbusTcp::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& context) { + dynamic_property_keys_.clear(); + const std::vector<std::string> dynamic_prop_keys = context.getDynamicPropertyKeys(); + for (const auto& key : dynamic_prop_keys) { + dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto generated").supportsExpressionLanguage(true).build()); + } +} + +std::shared_ptr<core::FlowFile> FetchModbusTcp::getFlowFile(core::ProcessSession& session) const { + if (hasIncomingConnections()) { + return session.get(); + } + return session.create(); +} + +std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> FetchModbusTcp::getAddressMap(core::ProcessContext& context, const core::FlowFile& flow_file) { + std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> address_map{}; + const auto unit_id_str = context.getProperty(UnitIdentifier, &flow_file).value_or("0"); + const uint8_t unit_id = utils::string::parse<uint8_t>(unit_id_str).value_or(1); + for (const auto& dynamic_property : dynamic_property_keys_) { + if (std::string dynamic_property_value{}; context.getDynamicProperty(dynamic_property, dynamic_property_value, &flow_file)) { + if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, unit_id, dynamic_property_value); modbus_func) + address_map.emplace(dynamic_property.getName(), std::move(modbus_func)); Review Comment: ```suggestion if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, unit_id, dynamic_property_value); modbus_func) { address_map.emplace(dynamic_property.getName(), std::move(modbus_func)); } ``` ########## extensions/standard-processors/modbus/FetchModbusTcp.cpp: ########## @@ -0,0 +1,259 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchModbusTcp.h" + +#include <utils/net/ConnectionHandler.h> + +#include <asio/read.hpp> +#include <range/v3/view/drop.hpp> + +#include "core/Resource.h" + +#include "core/ProcessSession.h" +#include "modbus/Error.h" +#include "modbus/ReadModbusFunctions.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::modbus { + + +void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + const auto record_set_writer_name = context.getProperty(RecordSetWriter); + record_set_writer_ = std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or(""))); + if (!record_set_writer_) + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; + + // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files + if (context.getProperty(Hostname).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing hostname"}; + } + if (context.getProperty(Port).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"}; + } + if (const auto idle_connection_expiration = context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) + idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); + else + idle_connection_expiration_.reset(); + + if (const auto timeout = context.getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms) + timeout_duration_ = timeout->getMilliseconds(); + else + timeout_duration_ = 15s; + + if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false)) + connections_.reset(); + else + connections_.emplace(); + + ssl_context_.reset(); + if (const auto context_name = context.getProperty(SSLContextService); context_name && !IsNullOrEmpty(*context_name)) { + if (auto controller_service = context.getControllerService(*context_name)) { + if (const auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name))) { + ssl_context_ = utils::net::getSslContext(*ssl_context_service); + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service"); + } + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name); + } + } + + readDynamicPropertyKeys(context); +} + +void FetchModbusTcp::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + const auto flow_file = getFlowFile(session); + if (!flow_file) { + logger_->log_error("No flowfile to work on"); + return; + } + + removeExpiredConnections(); + + auto hostname = context.getProperty(Hostname, flow_file.get()).value_or(std::string{}); + auto port = context.getProperty(Port, flow_file.get()).value_or(std::string{}); + + if (hostname.empty() || port.empty()) { + logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", flow_file->getUUIDStr(), + hostname.empty() ? "(empty)" : hostname.c_str(), + port.empty() ? "(empty)" : port.c_str()); + session.transfer(flow_file, Failure); + return; + } + + auto connection_id = utils::net::ConnectionId(std::move(hostname), std::move(port)); + std::shared_ptr<utils::net::ConnectionHandlerBase> handler; + if (!connections_ || !connections_->contains(connection_id)) { + if (ssl_context_) + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_); + else + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr); + if (connections_) + (*connections_)[connection_id] = handler; + } else { + handler = (*connections_)[connection_id]; + } + + gsl_Expects(handler); + + processFlowFile(handler, context, session, flow_file); +} + +void FetchModbusTcp::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& context) { + dynamic_property_keys_.clear(); + const std::vector<std::string> dynamic_prop_keys = context.getDynamicPropertyKeys(); + for (const auto& key : dynamic_prop_keys) { + dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto generated").supportsExpressionLanguage(true).build()); + } +} + +std::shared_ptr<core::FlowFile> FetchModbusTcp::getFlowFile(core::ProcessSession& session) const { + if (hasIncomingConnections()) { + return session.get(); + } + return session.create(); +} + +std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> FetchModbusTcp::getAddressMap(core::ProcessContext& context, const core::FlowFile& flow_file) { + std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> address_map{}; + const auto unit_id_str = context.getProperty(UnitIdentifier, &flow_file).value_or("0"); + const uint8_t unit_id = utils::string::parse<uint8_t>(unit_id_str).value_or(1); + for (const auto& dynamic_property : dynamic_property_keys_) { + if (std::string dynamic_property_value{}; context.getDynamicProperty(dynamic_property, dynamic_property_value, &flow_file)) { + if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, unit_id, dynamic_property_value); modbus_func) + address_map.emplace(dynamic_property.getName(), std::move(modbus_func)); + } + } + return address_map; +} + +void FetchModbusTcp::removeExpiredConnections() { + if (connections_) { + std::erase_if(*connections_, [this](auto& item) -> bool { + const auto& connection_handler = item.second; + return (!connection_handler || (idle_connection_expiration_ && !connection_handler->hasBeenUsedIn(*idle_connection_expiration_))); + }); + } +} + +void FetchModbusTcp::processFlowFile(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + core::ProcessContext& context, + core::ProcessSession& session, + const std::shared_ptr<core::FlowFile>& flow_file) { + std::unordered_map<std::string, std::string> result_map{}; + const auto address_map = getAddressMap(context, *flow_file); + if (address_map.empty()) { + logger_->log_warn("There are no registers to query"); + session.transfer(flow_file, Failure); + return; + } + + if (auto result = readModbus(connection_handler, address_map); !result) { + connection_handler->reset(); + logger_->log_error("{}", result.error().message()); + session.transfer(flow_file, Failure); + } else { + core::RecordSet record_set; + record_set.push_back(std::move(*result)); + record_set_writer_->write(record_set, flow_file, session); + session.transfer(flow_file, Success); + } +} + +nonstd::expected<core::Record, std::error_code> FetchModbusTcp::readModbus( + const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) { + nonstd::expected<core::Record, std::error_code> result; + io_context_.restart(); + asio::co_spawn(io_context_, + sendRequestsAndReadResponses(*connection_handler, address_map), + [&result](const std::exception_ptr& exception_ptr, auto res) { + if (exception_ptr) { + result = nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse); + } else { + result = std::move(res); + } + }); + io_context_.run(); + return result; +} + +auto FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) -> asio::awaitable<nonstd::expected<core::Record, std::error_code>> { + core::Record result; + for (const auto& [variable, read_modbus_fn] : address_map) { + auto response = co_await sendRequestAndReadResponse(connection_handler, *read_modbus_fn); + if (!response) { + co_return nonstd::make_unexpected(response.error()); + } + result.emplace(variable, std::move(*response)); + } + co_return result; +} + + +auto FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& connection_handler, + const ReadModbusFunction& read_modbus_function) -> asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> { + std::string result; + if (auto connection_error = co_await connection_handler.setupUsableSocket(io_context_)) // NOLINT + co_return nonstd::make_unexpected(connection_error); + + if (auto [write_error, bytes_written] = co_await connection_handler.write(asio::buffer(read_modbus_function.requestBytes())); write_error) + co_return nonstd::make_unexpected(write_error); Review Comment: ```suggestion if (auto [write_error, bytes_written] = co_await connection_handler.write(asio::buffer(read_modbus_function.requestBytes())); write_error) { co_return nonstd::make_unexpected(write_error); } ``` ########## extensions/standard-processors/modbus/FetchModbusTcp.cpp: ########## @@ -0,0 +1,259 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchModbusTcp.h" + +#include <utils/net/ConnectionHandler.h> + +#include <asio/read.hpp> +#include <range/v3/view/drop.hpp> + +#include "core/Resource.h" + +#include "core/ProcessSession.h" +#include "modbus/Error.h" +#include "modbus/ReadModbusFunctions.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::modbus { + + +void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + const auto record_set_writer_name = context.getProperty(RecordSetWriter); + record_set_writer_ = std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or(""))); + if (!record_set_writer_) + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; + + // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files + if (context.getProperty(Hostname).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing hostname"}; + } + if (context.getProperty(Port).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"}; + } + if (const auto idle_connection_expiration = context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) + idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); + else + idle_connection_expiration_.reset(); + + if (const auto timeout = context.getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms) + timeout_duration_ = timeout->getMilliseconds(); + else + timeout_duration_ = 15s; + + if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false)) + connections_.reset(); + else + connections_.emplace(); + + ssl_context_.reset(); + if (const auto context_name = context.getProperty(SSLContextService); context_name && !IsNullOrEmpty(*context_name)) { + if (auto controller_service = context.getControllerService(*context_name)) { + if (const auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name))) { + ssl_context_ = utils::net::getSslContext(*ssl_context_service); + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service"); + } + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name); + } + } + + readDynamicPropertyKeys(context); +} + +void FetchModbusTcp::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + const auto flow_file = getFlowFile(session); + if (!flow_file) { + logger_->log_error("No flowfile to work on"); + return; + } + + removeExpiredConnections(); + + auto hostname = context.getProperty(Hostname, flow_file.get()).value_or(std::string{}); + auto port = context.getProperty(Port, flow_file.get()).value_or(std::string{}); + + if (hostname.empty() || port.empty()) { + logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", flow_file->getUUIDStr(), + hostname.empty() ? "(empty)" : hostname.c_str(), + port.empty() ? "(empty)" : port.c_str()); + session.transfer(flow_file, Failure); + return; + } + + auto connection_id = utils::net::ConnectionId(std::move(hostname), std::move(port)); + std::shared_ptr<utils::net::ConnectionHandlerBase> handler; + if (!connections_ || !connections_->contains(connection_id)) { + if (ssl_context_) + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_); + else + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr); + if (connections_) + (*connections_)[connection_id] = handler; + } else { + handler = (*connections_)[connection_id]; + } + + gsl_Expects(handler); + + processFlowFile(handler, context, session, flow_file); +} + +void FetchModbusTcp::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& context) { + dynamic_property_keys_.clear(); + const std::vector<std::string> dynamic_prop_keys = context.getDynamicPropertyKeys(); + for (const auto& key : dynamic_prop_keys) { + dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto generated").supportsExpressionLanguage(true).build()); + } +} + +std::shared_ptr<core::FlowFile> FetchModbusTcp::getFlowFile(core::ProcessSession& session) const { + if (hasIncomingConnections()) { + return session.get(); + } + return session.create(); +} + +std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> FetchModbusTcp::getAddressMap(core::ProcessContext& context, const core::FlowFile& flow_file) { + std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> address_map{}; + const auto unit_id_str = context.getProperty(UnitIdentifier, &flow_file).value_or("0"); + const uint8_t unit_id = utils::string::parse<uint8_t>(unit_id_str).value_or(1); + for (const auto& dynamic_property : dynamic_property_keys_) { + if (std::string dynamic_property_value{}; context.getDynamicProperty(dynamic_property, dynamic_property_value, &flow_file)) { + if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, unit_id, dynamic_property_value); modbus_func) + address_map.emplace(dynamic_property.getName(), std::move(modbus_func)); + } + } + return address_map; +} + +void FetchModbusTcp::removeExpiredConnections() { + if (connections_) { + std::erase_if(*connections_, [this](auto& item) -> bool { + const auto& connection_handler = item.second; + return (!connection_handler || (idle_connection_expiration_ && !connection_handler->hasBeenUsedIn(*idle_connection_expiration_))); + }); + } +} + +void FetchModbusTcp::processFlowFile(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + core::ProcessContext& context, + core::ProcessSession& session, + const std::shared_ptr<core::FlowFile>& flow_file) { + std::unordered_map<std::string, std::string> result_map{}; + const auto address_map = getAddressMap(context, *flow_file); + if (address_map.empty()) { + logger_->log_warn("There are no registers to query"); + session.transfer(flow_file, Failure); + return; + } + + if (auto result = readModbus(connection_handler, address_map); !result) { + connection_handler->reset(); + logger_->log_error("{}", result.error().message()); + session.transfer(flow_file, Failure); + } else { + core::RecordSet record_set; + record_set.push_back(std::move(*result)); + record_set_writer_->write(record_set, flow_file, session); + session.transfer(flow_file, Success); + } +} + +nonstd::expected<core::Record, std::error_code> FetchModbusTcp::readModbus( + const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) { + nonstd::expected<core::Record, std::error_code> result; + io_context_.restart(); + asio::co_spawn(io_context_, + sendRequestsAndReadResponses(*connection_handler, address_map), + [&result](const std::exception_ptr& exception_ptr, auto res) { + if (exception_ptr) { + result = nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse); + } else { + result = std::move(res); + } + }); + io_context_.run(); + return result; +} + +auto FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) -> asio::awaitable<nonstd::expected<core::Record, std::error_code>> { + core::Record result; + for (const auto& [variable, read_modbus_fn] : address_map) { + auto response = co_await sendRequestAndReadResponse(connection_handler, *read_modbus_fn); + if (!response) { + co_return nonstd::make_unexpected(response.error()); + } + result.emplace(variable, std::move(*response)); + } + co_return result; +} + + +auto FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& connection_handler, + const ReadModbusFunction& read_modbus_function) -> asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> { + std::string result; + if (auto connection_error = co_await connection_handler.setupUsableSocket(io_context_)) // NOLINT + co_return nonstd::make_unexpected(connection_error); + + if (auto [write_error, bytes_written] = co_await connection_handler.write(asio::buffer(read_modbus_function.requestBytes())); write_error) + co_return nonstd::make_unexpected(write_error); + + std::array<std::byte, 7> apu_buffer{}; + asio::mutable_buffer response_apu(apu_buffer.data(), 7); + if (auto [read_error, bytes_read] = co_await connection_handler.read(response_apu); read_error) + co_return nonstd::make_unexpected(read_error); + + const auto received_transaction_id = fromBytes<uint16_t>({apu_buffer[0], apu_buffer[1]}); + const auto received_protocol = fromBytes<uint16_t>({apu_buffer[2], apu_buffer[3]}); + const auto received_length = fromBytes<uint16_t>({apu_buffer[4], apu_buffer[5]}); + const auto unit_id = static_cast<uint8_t>(apu_buffer[6]); + + if (received_transaction_id != read_modbus_function.getTransactionId()) + co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidTransactionId); + if (received_protocol != 0) + co_return nonstd::make_unexpected(ModbusExceptionCode::IllegalProtocol); + if (unit_id != read_modbus_function.getUnitId()) + co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidSlaveId); + if (received_length + 6 > 260 || received_length <= 1) + co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse); Review Comment: ```suggestion if (received_transaction_id != read_modbus_function.getTransactionId()) { co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidTransactionId); } if (received_protocol != 0) { co_return nonstd::make_unexpected(ModbusExceptionCode::IllegalProtocol); } if (unit_id != read_modbus_function.getUnitId()) { co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidSlaveId); } if (received_length + 6 > 260 || received_length <= 1) { co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse); } ``` ########## extensions/standard-processors/modbus/FetchModbusTcp.cpp: ########## @@ -0,0 +1,259 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchModbusTcp.h" + +#include <utils/net/ConnectionHandler.h> + +#include <asio/read.hpp> +#include <range/v3/view/drop.hpp> + +#include "core/Resource.h" + +#include "core/ProcessSession.h" +#include "modbus/Error.h" +#include "modbus/ReadModbusFunctions.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::modbus { + + +void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + const auto record_set_writer_name = context.getProperty(RecordSetWriter); + record_set_writer_ = std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or(""))); + if (!record_set_writer_) + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; + + // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files + if (context.getProperty(Hostname).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing hostname"}; + } + if (context.getProperty(Port).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"}; + } + if (const auto idle_connection_expiration = context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) + idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); + else + idle_connection_expiration_.reset(); + + if (const auto timeout = context.getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms) + timeout_duration_ = timeout->getMilliseconds(); + else + timeout_duration_ = 15s; + + if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false)) + connections_.reset(); + else + connections_.emplace(); + + ssl_context_.reset(); + if (const auto context_name = context.getProperty(SSLContextService); context_name && !IsNullOrEmpty(*context_name)) { + if (auto controller_service = context.getControllerService(*context_name)) { + if (const auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name))) { + ssl_context_ = utils::net::getSslContext(*ssl_context_service); + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service"); + } + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name); + } + } + + readDynamicPropertyKeys(context); +} + +void FetchModbusTcp::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + const auto flow_file = getFlowFile(session); + if (!flow_file) { + logger_->log_error("No flowfile to work on"); + return; + } + + removeExpiredConnections(); + + auto hostname = context.getProperty(Hostname, flow_file.get()).value_or(std::string{}); + auto port = context.getProperty(Port, flow_file.get()).value_or(std::string{}); + + if (hostname.empty() || port.empty()) { + logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", flow_file->getUUIDStr(), + hostname.empty() ? "(empty)" : hostname.c_str(), + port.empty() ? "(empty)" : port.c_str()); + session.transfer(flow_file, Failure); + return; + } + + auto connection_id = utils::net::ConnectionId(std::move(hostname), std::move(port)); + std::shared_ptr<utils::net::ConnectionHandlerBase> handler; + if (!connections_ || !connections_->contains(connection_id)) { + if (ssl_context_) + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_); + else + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr); + if (connections_) + (*connections_)[connection_id] = handler; + } else { + handler = (*connections_)[connection_id]; + } + + gsl_Expects(handler); + + processFlowFile(handler, context, session, flow_file); +} + +void FetchModbusTcp::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& context) { + dynamic_property_keys_.clear(); + const std::vector<std::string> dynamic_prop_keys = context.getDynamicPropertyKeys(); + for (const auto& key : dynamic_prop_keys) { + dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto generated").supportsExpressionLanguage(true).build()); + } +} + +std::shared_ptr<core::FlowFile> FetchModbusTcp::getFlowFile(core::ProcessSession& session) const { + if (hasIncomingConnections()) { + return session.get(); + } + return session.create(); +} + +std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> FetchModbusTcp::getAddressMap(core::ProcessContext& context, const core::FlowFile& flow_file) { + std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> address_map{}; + const auto unit_id_str = context.getProperty(UnitIdentifier, &flow_file).value_or("0"); + const uint8_t unit_id = utils::string::parse<uint8_t>(unit_id_str).value_or(1); + for (const auto& dynamic_property : dynamic_property_keys_) { + if (std::string dynamic_property_value{}; context.getDynamicProperty(dynamic_property, dynamic_property_value, &flow_file)) { + if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, unit_id, dynamic_property_value); modbus_func) + address_map.emplace(dynamic_property.getName(), std::move(modbus_func)); + } + } + return address_map; +} + +void FetchModbusTcp::removeExpiredConnections() { + if (connections_) { + std::erase_if(*connections_, [this](auto& item) -> bool { + const auto& connection_handler = item.second; + return (!connection_handler || (idle_connection_expiration_ && !connection_handler->hasBeenUsedIn(*idle_connection_expiration_))); + }); + } +} + +void FetchModbusTcp::processFlowFile(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + core::ProcessContext& context, + core::ProcessSession& session, + const std::shared_ptr<core::FlowFile>& flow_file) { + std::unordered_map<std::string, std::string> result_map{}; + const auto address_map = getAddressMap(context, *flow_file); + if (address_map.empty()) { + logger_->log_warn("There are no registers to query"); + session.transfer(flow_file, Failure); + return; + } + + if (auto result = readModbus(connection_handler, address_map); !result) { + connection_handler->reset(); + logger_->log_error("{}", result.error().message()); + session.transfer(flow_file, Failure); + } else { + core::RecordSet record_set; + record_set.push_back(std::move(*result)); + record_set_writer_->write(record_set, flow_file, session); + session.transfer(flow_file, Success); + } +} + +nonstd::expected<core::Record, std::error_code> FetchModbusTcp::readModbus( + const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) { + nonstd::expected<core::Record, std::error_code> result; + io_context_.restart(); + asio::co_spawn(io_context_, + sendRequestsAndReadResponses(*connection_handler, address_map), + [&result](const std::exception_ptr& exception_ptr, auto res) { + if (exception_ptr) { + result = nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse); + } else { + result = std::move(res); + } + }); + io_context_.run(); + return result; +} + +auto FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) -> asio::awaitable<nonstd::expected<core::Record, std::error_code>> { + core::Record result; + for (const auto& [variable, read_modbus_fn] : address_map) { + auto response = co_await sendRequestAndReadResponse(connection_handler, *read_modbus_fn); + if (!response) { + co_return nonstd::make_unexpected(response.error()); + } + result.emplace(variable, std::move(*response)); + } + co_return result; +} + + +auto FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& connection_handler, + const ReadModbusFunction& read_modbus_function) -> asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> { + std::string result; + if (auto connection_error = co_await connection_handler.setupUsableSocket(io_context_)) // NOLINT + co_return nonstd::make_unexpected(connection_error); + + if (auto [write_error, bytes_written] = co_await connection_handler.write(asio::buffer(read_modbus_function.requestBytes())); write_error) + co_return nonstd::make_unexpected(write_error); + + std::array<std::byte, 7> apu_buffer{}; + asio::mutable_buffer response_apu(apu_buffer.data(), 7); + if (auto [read_error, bytes_read] = co_await connection_handler.read(response_apu); read_error) + co_return nonstd::make_unexpected(read_error); Review Comment: ```suggestion if (auto [read_error, bytes_read] = co_await connection_handler.read(response_apu); read_error) { co_return nonstd::make_unexpected(read_error); } ``` ########## extensions/standard-processors/processors/PutTCP.cpp: ########## @@ -315,23 +141,48 @@ void PutTCP::removeExpiredConnections() { } } -std::error_code PutTCP::sendFlowFileContent(std::shared_ptr<ConnectionHandlerBase>& connection_handler, - const std::shared_ptr<io::InputStream>& flow_file_content_stream) { +std::error_code PutTCP::sendFlowFileContent(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + const std::shared_ptr<io::InputStream>& flow_file_content_stream) { Review Comment: ```suggestion std::error_code PutTCP::sendFlowFileContent(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, const std::shared_ptr<io::InputStream>& flow_file_content_stream) { ``` ########## extensions/standard-processors/modbus/FetchModbusTcp.cpp: ########## @@ -0,0 +1,259 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchModbusTcp.h" + +#include <utils/net/ConnectionHandler.h> + +#include <asio/read.hpp> +#include <range/v3/view/drop.hpp> + +#include "core/Resource.h" + +#include "core/ProcessSession.h" +#include "modbus/Error.h" +#include "modbus/ReadModbusFunctions.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::modbus { + + +void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + const auto record_set_writer_name = context.getProperty(RecordSetWriter); + record_set_writer_ = std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or(""))); + if (!record_set_writer_) + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; + + // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files + if (context.getProperty(Hostname).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing hostname"}; + } + if (context.getProperty(Port).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"}; + } + if (const auto idle_connection_expiration = context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) + idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); + else + idle_connection_expiration_.reset(); + + if (const auto timeout = context.getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms) + timeout_duration_ = timeout->getMilliseconds(); + else + timeout_duration_ = 15s; + + if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false)) + connections_.reset(); + else + connections_.emplace(); Review Comment: ```suggestion if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false)) { connections_.reset(); } else { connections_.emplace(); } ``` ########## extensions/standard-processors/modbus/FetchModbusTcp.cpp: ########## @@ -0,0 +1,259 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchModbusTcp.h" + +#include <utils/net/ConnectionHandler.h> + +#include <asio/read.hpp> +#include <range/v3/view/drop.hpp> + +#include "core/Resource.h" + +#include "core/ProcessSession.h" +#include "modbus/Error.h" +#include "modbus/ReadModbusFunctions.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::modbus { + + +void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + const auto record_set_writer_name = context.getProperty(RecordSetWriter); + record_set_writer_ = std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or(""))); + if (!record_set_writer_) + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; Review Comment: The style guide recommends braces around all if and for blocks: https://google.github.io/styleguide/cppguide.html#Formatting_Looping_Branching If you want to change this in the context of this project, we can discuss this with the wider contributor community, but until then, we should stick with it. ```suggestion if (!record_set_writer_) { throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; } ``` ########## extensions/standard-processors/modbus/Error.h: ########## @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> +#include <system_error> +#include "magic_enum.hpp" + +namespace org::apache::nifi::minifi::modbus { + +enum class ModbusExceptionCode : std::underlying_type_t<std::byte> { + IllegalFunction = 0x01, + IllegalDataAddress = 0x02, + IllegalDataValue = 0x03, + SlaveDeviceFailure = 0x04, + Acknowledge = 0x05, + SlaveDeviceBusy = 0x06, + NegativeAcknowledge = 0x07, + MemoryParityError = 0x08, + GatewayPathUnavailable = 0x0a, + GatewayTargetDeviceFailedToRespond = 0x0b, + InvalidResponse, + MessageTooLarge, + InvalidTransactionId, + IllegalProtocol, + InvalidSlaveId +}; Review Comment: Are these defined somewhere? A specification reference or similar in a comment would be nice. ########## extensions/standard-processors/modbus/FetchModbusTcp.h: ########## @@ -0,0 +1,143 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "controllers/SSLContextService.h" +#include "controllers/RecordSetWriter.h" +#include "core/Processor.h" +#include "core/PropertyDefinitionBuilder.h" +#include "logging/LoggerFactory.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" +#include "utils/net/ConnectionHandlerBase.h" + +namespace org::apache::nifi::minifi::modbus { + +class ReadModbusFunction; + +class FetchModbusTcp final : public core::Processor { + public: + explicit FetchModbusTcp(const std::string_view name, const utils::Identifier& uuid = {}) + : Processor(name, uuid) { + } + + EXTENSIONAPI static constexpr auto Description = "Processor able to read data from industrial PLCs using Modbus TCP/IP"; + + EXTENSIONAPI static constexpr auto Hostname = core::PropertyDefinitionBuilder<>::createProperty("Hostname") + .withDescription("The ip address or hostname of the destination.") + .isRequired(true) + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto Port = core::PropertyDefinitionBuilder<>::createProperty("Port") + .withDescription("The port or service on the destination.") + .withDefaultValue("502") + .isRequired(true) + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto UnitIdentifier = core::PropertyDefinitionBuilder<>::createProperty("Unit identifier") + .withDescription("The port or service on the destination.") + .isRequired(true) + .withDefaultValue("0") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto IdleConnectionExpiration = core::PropertyDefinitionBuilder<>::createProperty("Idle Connection Expiration") + .withDescription("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.") + .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) + .withDefaultValue("15 seconds") + .isRequired(true) + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto ConnectionPerFlowFile = core::PropertyDefinitionBuilder<>::createProperty("Connection Per FlowFile") + .withDescription("Specifies whether to send each FlowFile's content on an individual connection.") + .withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE) + .withDefaultValue("false") + .isRequired(true) + .supportsExpressionLanguage(false) + .build(); + EXTENSIONAPI static constexpr auto Timeout = core::PropertyDefinitionBuilder<>::createProperty("Timeout") + .withDescription("The timeout for connecting to and communicating with the destination.") + .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) + .withDefaultValue("15 seconds") + .isRequired(true) + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto SSLContextService = core::PropertyDefinitionBuilder<>::createProperty("SSL Context Service") + .withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be sent over a secure connection.") + .isRequired(false) + .withAllowedTypes<minifi::controllers::SSLContextService>() + .build(); + EXTENSIONAPI static constexpr auto RecordSetWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Set Writer") + .withDescription("Specifies the Controller Service to use for writing results to a FlowFile. ") + .isRequired(false) + .withAllowedTypes<core::RecordSetWriter>() + .build(); + + EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 8>{ + Hostname, + Port, + UnitIdentifier, + IdleConnectionExpiration, + ConnectionPerFlowFile, + Timeout, + SSLContextService, + RecordSetWriter + }; + + EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Successfully processed"}; + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "An error occurred processing"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr auto InputRequirement = core::annotation::Input::INPUT_ALLOWED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = true; + + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + void initialize() override; + + private: + void readDynamicPropertyKeys(const core::ProcessContext& context); + void processFlowFile(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + core::ProcessContext& context, + core::ProcessSession& session, + const std::shared_ptr<core::FlowFile>& flow_file); + + auto readModbus(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) -> nonstd::expected<core::Record, std::error_code>; + auto sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) -> asio::awaitable<nonstd::expected<core::Record, std::error_code>>; + auto sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& connection_handler, + const ReadModbusFunction& read_modbus_function) -> asio::awaitable<nonstd::expected<core::RecordField, std::error_code>>; Review Comment: Why trailing return types? In these cases, they don't depend on parameter identifiers, so I'd keep them in front for consistency. ```suggestion nonstd::expected<core::Record, std::error_code> readModbus(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map); asio::awaitable<nonstd::expected<core::Record, std::error_code>> sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& connection_handler, const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map); asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& connection_handler, const ReadModbusFunction& read_modbus_function); ``` ########## extensions/standard-processors/modbus/FetchModbusTcp.cpp: ########## @@ -0,0 +1,259 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchModbusTcp.h" + +#include <utils/net/ConnectionHandler.h> + +#include <asio/read.hpp> +#include <range/v3/view/drop.hpp> + +#include "core/Resource.h" + +#include "core/ProcessSession.h" +#include "modbus/Error.h" +#include "modbus/ReadModbusFunctions.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::modbus { + + +void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + const auto record_set_writer_name = context.getProperty(RecordSetWriter); + record_set_writer_ = std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or(""))); + if (!record_set_writer_) + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; + + // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files + if (context.getProperty(Hostname).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing hostname"}; + } + if (context.getProperty(Port).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"}; + } + if (const auto idle_connection_expiration = context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) + idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); + else + idle_connection_expiration_.reset(); + + if (const auto timeout = context.getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms) + timeout_duration_ = timeout->getMilliseconds(); + else + timeout_duration_ = 15s; + + if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false)) + connections_.reset(); + else + connections_.emplace(); + + ssl_context_.reset(); + if (const auto context_name = context.getProperty(SSLContextService); context_name && !IsNullOrEmpty(*context_name)) { + if (auto controller_service = context.getControllerService(*context_name)) { + if (const auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name))) { + ssl_context_ = utils::net::getSslContext(*ssl_context_service); + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service"); + } + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name); + } + } + + readDynamicPropertyKeys(context); +} + +void FetchModbusTcp::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + const auto flow_file = getFlowFile(session); + if (!flow_file) { + logger_->log_error("No flowfile to work on"); + return; + } + + removeExpiredConnections(); + + auto hostname = context.getProperty(Hostname, flow_file.get()).value_or(std::string{}); + auto port = context.getProperty(Port, flow_file.get()).value_or(std::string{}); + + if (hostname.empty() || port.empty()) { + logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", flow_file->getUUIDStr(), + hostname.empty() ? "(empty)" : hostname.c_str(), + port.empty() ? "(empty)" : port.c_str()); + session.transfer(flow_file, Failure); + return; + } + + auto connection_id = utils::net::ConnectionId(std::move(hostname), std::move(port)); + std::shared_ptr<utils::net::ConnectionHandlerBase> handler; + if (!connections_ || !connections_->contains(connection_id)) { + if (ssl_context_) + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_); + else + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr); + if (connections_) + (*connections_)[connection_id] = handler; + } else { + handler = (*connections_)[connection_id]; + } + + gsl_Expects(handler); + + processFlowFile(handler, context, session, flow_file); +} + +void FetchModbusTcp::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& context) { + dynamic_property_keys_.clear(); + const std::vector<std::string> dynamic_prop_keys = context.getDynamicPropertyKeys(); + for (const auto& key : dynamic_prop_keys) { + dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto generated").supportsExpressionLanguage(true).build()); + } +} + +std::shared_ptr<core::FlowFile> FetchModbusTcp::getFlowFile(core::ProcessSession& session) const { + if (hasIncomingConnections()) { + return session.get(); + } + return session.create(); +} + +std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> FetchModbusTcp::getAddressMap(core::ProcessContext& context, const core::FlowFile& flow_file) { + std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> address_map{}; + const auto unit_id_str = context.getProperty(UnitIdentifier, &flow_file).value_or("0"); + const uint8_t unit_id = utils::string::parse<uint8_t>(unit_id_str).value_or(1); + for (const auto& dynamic_property : dynamic_property_keys_) { + if (std::string dynamic_property_value{}; context.getDynamicProperty(dynamic_property, dynamic_property_value, &flow_file)) { + if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, unit_id, dynamic_property_value); modbus_func) + address_map.emplace(dynamic_property.getName(), std::move(modbus_func)); + } + } + return address_map; +} + +void FetchModbusTcp::removeExpiredConnections() { + if (connections_) { + std::erase_if(*connections_, [this](auto& item) -> bool { + const auto& connection_handler = item.second; + return (!connection_handler || (idle_connection_expiration_ && !connection_handler->hasBeenUsedIn(*idle_connection_expiration_))); + }); + } +} + +void FetchModbusTcp::processFlowFile(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + core::ProcessContext& context, + core::ProcessSession& session, + const std::shared_ptr<core::FlowFile>& flow_file) { + std::unordered_map<std::string, std::string> result_map{}; + const auto address_map = getAddressMap(context, *flow_file); + if (address_map.empty()) { + logger_->log_warn("There are no registers to query"); + session.transfer(flow_file, Failure); + return; + } + + if (auto result = readModbus(connection_handler, address_map); !result) { + connection_handler->reset(); + logger_->log_error("{}", result.error().message()); + session.transfer(flow_file, Failure); + } else { + core::RecordSet record_set; + record_set.push_back(std::move(*result)); + record_set_writer_->write(record_set, flow_file, session); + session.transfer(flow_file, Success); + } +} + +nonstd::expected<core::Record, std::error_code> FetchModbusTcp::readModbus( + const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) { + nonstd::expected<core::Record, std::error_code> result; + io_context_.restart(); + asio::co_spawn(io_context_, + sendRequestsAndReadResponses(*connection_handler, address_map), + [&result](const std::exception_ptr& exception_ptr, auto res) { + if (exception_ptr) { + result = nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse); + } else { + result = std::move(res); + } + }); + io_context_.run(); + return result; +} + +auto FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) -> asio::awaitable<nonstd::expected<core::Record, std::error_code>> { + core::Record result; + for (const auto& [variable, read_modbus_fn] : address_map) { + auto response = co_await sendRequestAndReadResponse(connection_handler, *read_modbus_fn); + if (!response) { + co_return nonstd::make_unexpected(response.error()); + } + result.emplace(variable, std::move(*response)); + } + co_return result; +} + + +auto FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& connection_handler, + const ReadModbusFunction& read_modbus_function) -> asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> { + std::string result; + if (auto connection_error = co_await connection_handler.setupUsableSocket(io_context_)) // NOLINT + co_return nonstd::make_unexpected(connection_error); Review Comment: What's the linter issue on this line? Could you add details after the NOLINT comment? ```suggestion if (auto connection_error = co_await connection_handler.setupUsableSocket(io_context_)) { // NOLINT co_return nonstd::make_unexpected(connection_error); } ``` ########## extensions/standard-processors/modbus/ByteConverters.h: ########## @@ -0,0 +1,51 @@ +/** +* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <array> +#include <cstdint> +#include <span> +#include <string_view> +#include <bit> + +namespace org::apache::nifi::minifi::modbus { + +template<typename T, std::endian to_endianness = std::endian::big> +std::array<std::byte, std::max(sizeof(T), sizeof(uint16_t))> toBytes(T value) { Review Comment: Can you make these helpers `constexpr`? I don't see why they couldn't be done at compile-time. ########## libminifi/include/utils/StringUtils.h: ########## @@ -432,6 +433,14 @@ struct ParseError {}; nonstd::expected<std::optional<char>, ParseError> parseCharacter(std::string_view input); std::string replaceEscapedCharacters(std::string_view input); + +template<typename T> +nonstd::expected<T, ParseError> parse(std::string_view input) { + T t{}; + if (const auto result = std::from_chars(input.data(), input.data() + input.size(), t); result.ptr != input.data() + input.size()) + return nonstd::make_unexpected(ParseError{}); + return t; +} Review Comment: Better naming and docs would be appreciated. From what I can see, this only parses numbers. ```suggestion // no std::arithmetic yet template <typename T> concept arithmetic = integral<T> || floating_point<T>; template<arithmetic T> nonstd::expected<T, ParseError> parseNumber(std::string_view input) { T t{}; if (const auto result = std::from_chars(input.data(), input.data() + input.size(), t); result.ptr != input.data() + input.size()) return nonstd::make_unexpected(ParseError{}); return t; } ``` ########## extensions/standard-processors/modbus/FetchModbusTcp.cpp: ########## @@ -0,0 +1,259 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FetchModbusTcp.h" + +#include <utils/net/ConnectionHandler.h> + +#include <asio/read.hpp> +#include <range/v3/view/drop.hpp> + +#include "core/Resource.h" + +#include "core/ProcessSession.h" +#include "modbus/Error.h" +#include "modbus/ReadModbusFunctions.h" +#include "utils/net/AsioCoro.h" +#include "utils/net/AsioSocketUtils.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::modbus { + + +void FetchModbusTcp::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + const auto record_set_writer_name = context.getProperty(RecordSetWriter); + record_set_writer_ = std::dynamic_pointer_cast<core::RecordSetWriter>(context.getControllerService(record_set_writer_name.value_or(""))); + if (!record_set_writer_) + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Invalid or missing RecordSetWriter"}; + + // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files + if (context.getProperty(Hostname).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing hostname"}; + } + if (context.getProperty(Port).value_or(std::string{}).empty()) { + throw Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "missing port"}; + } + if (const auto idle_connection_expiration = context.getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) + idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); + else + idle_connection_expiration_.reset(); + + if (const auto timeout = context.getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms) + timeout_duration_ = timeout->getMilliseconds(); + else + timeout_duration_ = 15s; + + if (context.getProperty<bool>(ConnectionPerFlowFile).value_or(false)) + connections_.reset(); + else + connections_.emplace(); + + ssl_context_.reset(); + if (const auto context_name = context.getProperty(SSLContextService); context_name && !IsNullOrEmpty(*context_name)) { + if (auto controller_service = context.getControllerService(*context_name)) { + if (const auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*context_name))) { + ssl_context_ = utils::net::getSslContext(*ssl_context_service); + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, *context_name + " is not an SSL Context Service"); + } + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + *context_name); + } + } + + readDynamicPropertyKeys(context); +} + +void FetchModbusTcp::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + const auto flow_file = getFlowFile(session); + if (!flow_file) { + logger_->log_error("No flowfile to work on"); + return; + } + + removeExpiredConnections(); + + auto hostname = context.getProperty(Hostname, flow_file.get()).value_or(std::string{}); + auto port = context.getProperty(Port, flow_file.get()).value_or(std::string{}); + + if (hostname.empty() || port.empty()) { + logger_->log_error("[{}] invalid target endpoint: hostname: {}, port: {}", flow_file->getUUIDStr(), + hostname.empty() ? "(empty)" : hostname.c_str(), + port.empty() ? "(empty)" : port.c_str()); + session.transfer(flow_file, Failure); + return; + } + + auto connection_id = utils::net::ConnectionId(std::move(hostname), std::move(port)); + std::shared_ptr<utils::net::ConnectionHandlerBase> handler; + if (!connections_ || !connections_->contains(connection_id)) { + if (ssl_context_) + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_); + else + handler = std::make_shared<utils::net::ConnectionHandler<utils::net::TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr); + if (connections_) + (*connections_)[connection_id] = handler; + } else { + handler = (*connections_)[connection_id]; + } + + gsl_Expects(handler); + + processFlowFile(handler, context, session, flow_file); +} + +void FetchModbusTcp::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void FetchModbusTcp::readDynamicPropertyKeys(const core::ProcessContext& context) { + dynamic_property_keys_.clear(); + const std::vector<std::string> dynamic_prop_keys = context.getDynamicPropertyKeys(); + for (const auto& key : dynamic_prop_keys) { + dynamic_property_keys_.emplace_back(core::PropertyDefinitionBuilder<>::createProperty(key).withDescription("auto generated").supportsExpressionLanguage(true).build()); + } +} + +std::shared_ptr<core::FlowFile> FetchModbusTcp::getFlowFile(core::ProcessSession& session) const { + if (hasIncomingConnections()) { + return session.get(); + } + return session.create(); +} + +std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> FetchModbusTcp::getAddressMap(core::ProcessContext& context, const core::FlowFile& flow_file) { + std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>> address_map{}; + const auto unit_id_str = context.getProperty(UnitIdentifier, &flow_file).value_or("0"); + const uint8_t unit_id = utils::string::parse<uint8_t>(unit_id_str).value_or(1); + for (const auto& dynamic_property : dynamic_property_keys_) { + if (std::string dynamic_property_value{}; context.getDynamicProperty(dynamic_property, dynamic_property_value, &flow_file)) { + if (auto modbus_func = ReadModbusFunction::parse(++transaction_id_, unit_id, dynamic_property_value); modbus_func) + address_map.emplace(dynamic_property.getName(), std::move(modbus_func)); + } + } + return address_map; +} + +void FetchModbusTcp::removeExpiredConnections() { + if (connections_) { + std::erase_if(*connections_, [this](auto& item) -> bool { + const auto& connection_handler = item.second; + return (!connection_handler || (idle_connection_expiration_ && !connection_handler->hasBeenUsedIn(*idle_connection_expiration_))); + }); + } +} + +void FetchModbusTcp::processFlowFile(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + core::ProcessContext& context, + core::ProcessSession& session, + const std::shared_ptr<core::FlowFile>& flow_file) { + std::unordered_map<std::string, std::string> result_map{}; + const auto address_map = getAddressMap(context, *flow_file); + if (address_map.empty()) { + logger_->log_warn("There are no registers to query"); + session.transfer(flow_file, Failure); + return; + } + + if (auto result = readModbus(connection_handler, address_map); !result) { + connection_handler->reset(); + logger_->log_error("{}", result.error().message()); + session.transfer(flow_file, Failure); + } else { + core::RecordSet record_set; + record_set.push_back(std::move(*result)); + record_set_writer_->write(record_set, flow_file, session); + session.transfer(flow_file, Success); + } +} + +nonstd::expected<core::Record, std::error_code> FetchModbusTcp::readModbus( + const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) { + nonstd::expected<core::Record, std::error_code> result; + io_context_.restart(); + asio::co_spawn(io_context_, + sendRequestsAndReadResponses(*connection_handler, address_map), + [&result](const std::exception_ptr& exception_ptr, auto res) { + if (exception_ptr) { + result = nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse); + } else { + result = std::move(res); + } + }); + io_context_.run(); + return result; +} + +auto FetchModbusTcp::sendRequestsAndReadResponses(utils::net::ConnectionHandlerBase& connection_handler, + const std::unordered_map<std::string, std::unique_ptr<ReadModbusFunction>>& address_map) -> asio::awaitable<nonstd::expected<core::Record, std::error_code>> { + core::Record result; + for (const auto& [variable, read_modbus_fn] : address_map) { + auto response = co_await sendRequestAndReadResponse(connection_handler, *read_modbus_fn); + if (!response) { + co_return nonstd::make_unexpected(response.error()); + } + result.emplace(variable, std::move(*response)); + } + co_return result; +} + + +auto FetchModbusTcp::sendRequestAndReadResponse(utils::net::ConnectionHandlerBase& connection_handler, + const ReadModbusFunction& read_modbus_function) -> asio::awaitable<nonstd::expected<core::RecordField, std::error_code>> { + std::string result; + if (auto connection_error = co_await connection_handler.setupUsableSocket(io_context_)) // NOLINT + co_return nonstd::make_unexpected(connection_error); + + if (auto [write_error, bytes_written] = co_await connection_handler.write(asio::buffer(read_modbus_function.requestBytes())); write_error) + co_return nonstd::make_unexpected(write_error); + + std::array<std::byte, 7> apu_buffer{}; + asio::mutable_buffer response_apu(apu_buffer.data(), 7); + if (auto [read_error, bytes_read] = co_await connection_handler.read(response_apu); read_error) + co_return nonstd::make_unexpected(read_error); + + const auto received_transaction_id = fromBytes<uint16_t>({apu_buffer[0], apu_buffer[1]}); + const auto received_protocol = fromBytes<uint16_t>({apu_buffer[2], apu_buffer[3]}); + const auto received_length = fromBytes<uint16_t>({apu_buffer[4], apu_buffer[5]}); + const auto unit_id = static_cast<uint8_t>(apu_buffer[6]); + + if (received_transaction_id != read_modbus_function.getTransactionId()) + co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidTransactionId); + if (received_protocol != 0) + co_return nonstd::make_unexpected(ModbusExceptionCode::IllegalProtocol); + if (unit_id != read_modbus_function.getUnitId()) + co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidSlaveId); + if (received_length + 6 > 260 || received_length <= 1) + co_return nonstd::make_unexpected(ModbusExceptionCode::InvalidResponse); + + std::array<std::byte, 260-7> pdu_buffer{}; + asio::mutable_buffer response_pdu(pdu_buffer.data(), received_length-1); + auto [read_error, bytes_read] = co_await connection_handler.read(response_pdu); + if (read_error) + co_return nonstd::make_unexpected(read_error); Review Comment: ```suggestion if (read_error) { co_return nonstd::make_unexpected(read_error); } ``` ########## extensions/standard-processors/processors/PutTCP.cpp: ########## @@ -315,23 +141,48 @@ void PutTCP::removeExpiredConnections() { } } -std::error_code PutTCP::sendFlowFileContent(std::shared_ptr<ConnectionHandlerBase>& connection_handler, - const std::shared_ptr<io::InputStream>& flow_file_content_stream) { +std::error_code PutTCP::sendFlowFileContent(const std::shared_ptr<utils::net::ConnectionHandlerBase>& connection_handler, + const std::shared_ptr<io::InputStream>& flow_file_content_stream) { std::error_code operation_error; io_context_.restart(); asio::co_spawn(io_context_, - connection_handler->sendStreamWithDelimiter(flow_file_content_stream, delimiter_, io_context_), - [&operation_error](const std::exception_ptr&, std::error_code error_code) { - operation_error = error_code; - }); + sendStreamWithDelimiter(*connection_handler, flow_file_content_stream, delimiter_), + [&operation_error](const std::exception_ptr&, const std::error_code error_code) { + operation_error = error_code; + }); io_context_.run(); return operation_error; } -void PutTCP::processFlowFile(std::shared_ptr<ConnectionHandlerBase>& connection_handler, +asio::awaitable<std::error_code> PutTCP::sendStreamWithDelimiter(utils::net::ConnectionHandlerBase& connection_handler, + const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter) { + if (auto connection_error = co_await connection_handler.setupUsableSocket(io_context_)) // NOLINT + co_return connection_error; Review Comment: ```suggestion if (auto connection_error = co_await connection_handler.setupUsableSocket(io_context_)) { // NOLINT co_return connection_error; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
