This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit ef701211b124692dbe1413e960d2422dd580daf5 Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Jul 21 13:41:09 2025 +0200 MINIFICPP-2591 Add new value option to FetchOPCProcessor lazy mode Signed-off-by: Ferenc Gerlits <[email protected]> Closes #1994 --- PROCESSORS.md | 30 +++---- extensions/opc/include/fetchopc.h | 46 ++++++++--- extensions/opc/include/putopc.h | 2 +- extensions/opc/src/fetchopc.cpp | 101 +++++++++++++++++++----- extensions/opc/tests/FetchOpcProcessorTests.cpp | 94 ++++++++++++++++++++++ extensions/opc/tests/OpcUaTestServer.h | 30 ++++++- 6 files changed, 256 insertions(+), 47 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 7bb2e948a..bec6d178f 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -920,21 +920,21 @@ Fetches OPC-UA node In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|---------------------------------|---------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **OPC server endpoint** | | | Specifies the address, port and relative path of an OPC endpoint | -| Application URI | | | Application URI of the client in the format 'urn:unconfigured:application'. Mandatory, if using Secure Channel and must match the URI included in the certificate's Subject Alternative Names. | -| Username | | | Username to log in with. | -| Password | | | Password to log in with.<br/>**Sensitive Property: true** | -| Certificate path | | | Path to the DER-encoded cert file | -| Key path | | | Path to the DER-encoded key file | -| Trusted server certificate path | | | Comma separated list of paths to the DER-encoded trusted server certificates | -| Path reference types | | | Specify the reference types between nodes in the path if Path Node ID type is used. If not provided, all reference types are assumed to be Organizes. The format is 'referenceType1/referenceType2/.../referenceTypeN' and the supported reference types are Organizes, HasComponent, HasProperty, and HasSubtype. | -| **Node ID type** | | Path<br/>Int<br/>String | Specifies the type of the provided node ID | -| **Node ID** | | | Specifies the ID of the root node to traverse. In case of a Path Node ID Type, the path should be provided in the format of 'path/to/node'. | -| **Namespace index** | 0 | | The index of the namespace. | -| **Max depth** | 0 | | Specifiec the max depth of browsing. 0 means unlimited. | -| **Lazy mode** | Off | On<br/>Off | Only creates flowfiles from nodes with new timestamp from the server. | +| Name | Default Value | Allowable Values | Description | +|---------------------------------|---------------|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **OPC server endpoint** | | | Specifies the address, port and relative path of an OPC endpoint | +| Application URI | | | Application URI of the client in the format 'urn:unconfigured:application'. Mandatory, if using Secure Channel and must match the URI included in the certificate's Subject Alternative Names. | +| Username | | | Username to log in with. | +| Password | | | Password to log in with.<br/>**Sensitive Property: true** | +| Certificate path | | | Path to the DER-encoded cert file | +| Key path | | | Path to the DER-encoded key file | +| Trusted server certificate path | | | Comma separated list of paths to the DER-encoded trusted server certificates | +| Path reference types | | | Specify the reference types between nodes in the path if Path Node ID type is used. If not provided, all reference types are assumed to be Organizes. The format is 'referenceType1/referenceType2/.../referenceTypeN' and the supported reference types are Organizes, HasComponent, HasProperty, and HasSubtype. | +| **Node ID type** | | Path<br/>Int<br/>String | Specifies the type of the provided node ID | +| **Node ID** | | | Specifies the ID of the root node to traverse. In case of a Path Node ID Type, the path should be provided in the format of 'path/to/node'. | +| **Namespace index** | 0 | | The index of the namespace. | +| **Max depth** | 0 | | Specifiec the max depth of browsing. 0 means unlimited. | +| **Lazy mode** | Off | On<br/>New Value<br/>Off | Only creates flowfiles from nodes with new timestamp from the server. If set to 'New Value', it will only create flowfiles if the value of the node data has changed since the last fetch, the timestamp is ignored. | ### Relationships diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h index 012935df4..ada1522f8 100644 --- a/extensions/opc/include/fetchopc.h +++ b/extensions/opc/include/fetchopc.h @@ -36,15 +36,39 @@ #include "utils/ArrayUtils.h" #include "utils/Id.h" #include "utils/gsl.h" +#include "minifi-cpp/core/StateManager.h" namespace org::apache::nifi::minifi::processors { enum class LazyModeOptions { On, + NewValue, Off }; -class FetchOPCProcessor : public BaseOPCProcessor { +} // namespace org::apache::nifi::minifi::processors + +namespace magic_enum::customize { + +using LazyModeOptions = org::apache::nifi::minifi::processors::LazyModeOptions; + +template <> +constexpr customize_t enum_name<LazyModeOptions>(LazyModeOptions value) noexcept { + switch (value) { + case LazyModeOptions::On: + return "On"; + case LazyModeOptions::NewValue: + return "New Value"; + case LazyModeOptions::Off: + return "Off"; + } + return invalid_tag; +} +} // namespace magic_enum::customize + +namespace org::apache::nifi::minifi::processors { + +class FetchOPCProcessor final : public BaseOPCProcessor { public: explicit FetchOPCProcessor(std::string_view name, const utils::Identifier& uuid = {}) : BaseOPCProcessor(name, uuid) { @@ -75,7 +99,8 @@ class FetchOPCProcessor : public BaseOPCProcessor { .isRequired(true) .build(); EXTENSIONAPI static constexpr auto Lazy = core::PropertyDefinitionBuilder<magic_enum::enum_count<LazyModeOptions>()>::createProperty("Lazy mode") - .withDescription("Only creates flowfiles from nodes with new timestamp from the server.") + .withDescription("Only creates flowfiles from nodes with new timestamp from the server. If set to 'New Value', it will only create flowfiles " + "if the value of the node data has changed since the last fetch, the timestamp is ignored.") .isRequired(true) .withAllowedValues(magic_enum::enum_names<LazyModeOptions>()) .withDefaultValue(magic_enum::enum_name(LazyModeOptions::Off)) @@ -116,19 +141,20 @@ class FetchOPCProcessor : public BaseOPCProcessor { void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; void initialize() override; - protected: + private: bool nodeFoundCallBack(const UA_ReferenceDescription *ref, const std::string& path, core::ProcessContext& context, core::ProcessSession& session, - size_t& nodes_found, size_t& variables_found); - - void OPCData2FlowFile(const opc::NodeData& opc_node, core::ProcessContext& context, core::ProcessSession& session); + size_t& nodes_found, size_t& variables_found, std::unordered_map<std::string, std::string>& state_map); + void OPCData2FlowFile(const opc::NodeData& opc_node, core::ProcessContext& context, core::ProcessSession& session) const; + void writeFlowFileUsingLazyModeWithTimestamp(const opc::NodeData& nodedata, core::ProcessContext& context, core::ProcessSession& session, size_t& variables_found, + std::unordered_map<std::string, std::string>& state_map) const; + void writeFlowFileUsingLazyModeWithNewValue(const opc::NodeData& nodedata, core::ProcessContext& context, core::ProcessSession& session, size_t& variables_found, + std::unordered_map<std::string, std::string>& state_map) const; uint64_t max_depth_ = 0; - bool lazy_mode_ = false; - - private: + LazyModeOptions lazy_mode_ = LazyModeOptions::Off; std::vector<UA_NodeId> translated_node_ids_; // Only used when user provides path, path->nodeid translation is only done once - std::unordered_map<std::string, std::string> node_timestamp_; // Key = Full path, Value = Timestamp + core::StateManager* state_manager_ = nullptr; }; } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h index 92f4f3256..cf29c6e47 100644 --- a/extensions/opc/include/putopc.h +++ b/extensions/opc/include/putopc.h @@ -38,7 +38,7 @@ namespace org::apache::nifi::minifi::processors { -class PutOPCProcessor : public BaseOPCProcessor { +class PutOPCProcessor final : public BaseOPCProcessor { public: EXTENSIONAPI static constexpr const char* Description = "Creates/updates OPC nodes"; diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp index 1d917082b..7ac232ac6 100644 --- a/extensions/opc/src/fetchopc.cpp +++ b/extensions/opc/src/fetchopc.cpp @@ -40,6 +40,11 @@ void FetchOPCProcessor::initialize() { void FetchOPCProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) { logger_->log_trace("FetchOPCProcessor::onSchedule"); + state_manager_ = context.getStateManager(); + if (state_manager_ == nullptr) { + throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); + } + translated_node_ids_.clear(); // Path might has changed during restart BaseOPCProcessor::onSchedule(context, factory); @@ -51,7 +56,7 @@ void FetchOPCProcessor::onSchedule(core::ProcessContext& context, core::ProcessS namespace_idx_ = gsl::narrow<int32_t>(utils::parseI64Property(context, NameSpaceIndex)); - lazy_mode_ = utils::parseEnumProperty<LazyModeOptions>(context, Lazy) == LazyModeOptions::On; + lazy_mode_ = utils::parseEnumProperty<LazyModeOptions>(context, Lazy); if (id_type_ == opc::OPCNodeIDType::Path) { readPathReferenceTypes(context, node_id_); @@ -69,8 +74,12 @@ void FetchOPCProcessor::onTrigger(core::ProcessContext& context, core::ProcessSe size_t nodes_found = 0; size_t variables_found = 0; - auto found_cb = [this, &context, &session, &nodes_found, &variables_found](const UA_ReferenceDescription* ref, const std::string& path) { - return nodeFoundCallBack(ref, path, context, session, nodes_found, variables_found); }; + std::unordered_map<std::string, std::string> state_map; + state_manager_->get(state_map); + + auto found_cb = [this, &context, &session, &nodes_found, &variables_found, &state_map](const UA_ReferenceDescription* ref, const std::string& path) { + return nodeFoundCallBack(ref, path, context, session, nodes_found, variables_found, state_map); + }; if (id_type_ != opc::OPCNodeIDType::Path) { UA_NodeId my_id; @@ -107,29 +116,85 @@ void FetchOPCProcessor::onTrigger(core::ProcessContext& context, core::ProcessSe logger_->log_warn("Found no variables when traversing the specified node. No flowfiles are generated. Yielding..."); yield(); } + + state_manager_->set(state_map); +} + +void FetchOPCProcessor::writeFlowFileUsingLazyModeWithTimestamp(const opc::NodeData& nodedata, core::ProcessContext& context, core::ProcessSession& session, size_t& variables_found, + std::unordered_map<std::string, std::string>& state_map) const { + auto writeDataToFlowFile = [this, &nodedata, &context, &session, &variables_found]() { + OPCData2FlowFile(nodedata, context, session); + ++variables_found; + }; + + auto full_path_it = nodedata.attributes.find("Full path"); + if (full_path_it == nodedata.attributes.end()) { + logger_->log_error("Node data does not contain 'Full path' attribute, cannot read state for node"); + writeDataToFlowFile(); + return; + } + + auto source_timestamp_it = nodedata.attributes.find("Sourcetimestamp"); + if (source_timestamp_it == nodedata.attributes.end()) { + logger_->log_error("Node data does not contain 'Sourcetimestamp' attribute, cannot read state for node"); + writeDataToFlowFile(); + return; + } + + auto new_state_value = source_timestamp_it->second; + auto nodeid = full_path_it->second + "_timestamp"; + auto cur_state_value = state_map[nodeid]; + if (cur_state_value.empty() || cur_state_value != new_state_value) { + logger_->log_debug("Node {} has new source timestamp", full_path_it->second); + state_map[nodeid] = new_state_value; + writeDataToFlowFile(); + return; + } + + logger_->log_debug("Node {} has no new source timestamp, skipping", full_path_it->second); +} + +void FetchOPCProcessor::writeFlowFileUsingLazyModeWithNewValue(const opc::NodeData& nodedata, core::ProcessContext& context, core::ProcessSession& session, size_t& variables_found, + std::unordered_map<std::string, std::string>& state_map) const { + auto writeDataToFlowFile = [this, &nodedata, &context, &session, &variables_found]() { + OPCData2FlowFile(nodedata, context, session); + ++variables_found; + }; + + auto full_path_it = nodedata.attributes.find("Full path"); + if (full_path_it == nodedata.attributes.end()) { + logger_->log_error("Node data does not contain 'Full path' attribute, cannot read state for node"); + writeDataToFlowFile(); + return; + } + + auto new_state_value = opc::nodeValue2String(nodedata); + auto nodeid = full_path_it->second + "_new_value"; + auto cur_state_value = state_map[nodeid]; + if (cur_state_value.empty() || cur_state_value != new_state_value) { + logger_->log_debug("Node {} has new value", full_path_it->second); + state_map[nodeid] = new_state_value; + writeDataToFlowFile(); + return; + } + + logger_->log_debug("Node {} has no new value, skipping", full_path_it->second); } bool FetchOPCProcessor::nodeFoundCallBack(const UA_ReferenceDescription *ref, const std::string& path, - core::ProcessContext& context, core::ProcessSession& session, size_t& nodes_found, size_t& variables_found) { + core::ProcessContext& context, core::ProcessSession& session, size_t& nodes_found, size_t& variables_found, + std::unordered_map<std::string, std::string>& state_map) { ++nodes_found; if (ref->nodeClass != UA_NODECLASS_VARIABLE) { return true; } try { opc::NodeData nodedata = connection_->getNodeData(ref, path); - bool write = true; - if (lazy_mode_) { - write = false; - std::string nodeid = nodedata.attributes["Full path"]; - std::string cur_timestamp = node_timestamp_[nodeid]; - std::string new_timestamp = nodedata.attributes["Sourcetimestamp"]; - if (cur_timestamp != new_timestamp) { - node_timestamp_[nodeid] = new_timestamp; - logger_->log_debug("Node {} has new source timestamp {}", nodeid, new_timestamp); - write = true; - } - } - if (write) { + if (lazy_mode_ == LazyModeOptions::On) { + writeFlowFileUsingLazyModeWithTimestamp(nodedata, context, session, variables_found, state_map); + } else if (lazy_mode_ == LazyModeOptions::NewValue) { + writeFlowFileUsingLazyModeWithNewValue(nodedata, context, session, variables_found, state_map); + } else { OPCData2FlowFile(nodedata, context, session); ++variables_found; } @@ -140,7 +205,7 @@ bool FetchOPCProcessor::nodeFoundCallBack(const UA_ReferenceDescription *ref, co return true; } -void FetchOPCProcessor::OPCData2FlowFile(const opc::NodeData& opc_node, core::ProcessContext&, core::ProcessSession& session) { +void FetchOPCProcessor::OPCData2FlowFile(const opc::NodeData& opc_node, core::ProcessContext&, core::ProcessSession& session) const { auto flow_file = session.create(); if (flow_file == nullptr) { logger_->log_error("Failed to create flowfile!"); diff --git a/extensions/opc/tests/FetchOpcProcessorTests.cpp b/extensions/opc/tests/FetchOpcProcessorTests.cpp index a2b77d6c9..247c3dad1 100644 --- a/extensions/opc/tests/FetchOpcProcessorTests.cpp +++ b/extensions/opc/tests/FetchOpcProcessorTests.cpp @@ -179,4 +179,98 @@ TEST_CASE("Test trusted certs path must be valid", "[fetchopcprocessor]") { REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: Failed to load trusted server certs from path: /invalid/trusted"); } +TEST_CASE("Test no fetch result using lazy mode when no timestamps are changed", "[fetchopcprocessor]") { + OpcUaTestServer server(4841); + server.start(); + SingleProcessorTestController controller{std::make_unique<processors::FetchOPCProcessor>("FetchOPCProcessor")}; + LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>(); + auto fetch_opc_processor = controller.getProcessor(); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name, "Path")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name, "Simulator/Default/Device1")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name, std::to_string(server.getNamespaceIndex()))); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name, "On")); + + auto results = controller.trigger(); + REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4); + + results = controller.trigger(); + REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::FetchOPCProcessor::Success).empty()); + REQUIRE(LogTestController::getInstance().contains("Node Simulator/Default/Device1/INT3 has no new source timestamp, skipping")); +} + +TEST_CASE("Test fetch for nodes with changed timestamps with lazy mode", "[fetchopcprocessor]") { + OpcUaTestServer server(4841); + server.start(); + SingleProcessorTestController controller{std::make_unique<processors::FetchOPCProcessor>("FetchOPCProcessor")}; + LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>(); + auto fetch_opc_processor = controller.getProcessor(); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name, "Path")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name, "Simulator/Default/Device1")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name, std::to_string(server.getNamespaceIndex()))); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name, "On")); + + auto results = controller.trigger(); + REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4); + + server.updateNodeTimestamp("Simulator/Default/Device1/INT3"); + results = controller.trigger(); + REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 1); + auto flow_file = results.at(processors::FetchOPCProcessor::Success)[0]; + CHECK(flow_file->getAttribute("Browsename") == "INT3"); +} + +TEST_CASE("Test no fetch result using lazy new value mode when no values are changed", "[fetchopcprocessor]") { + OpcUaTestServer server(4841); + server.start(); + SingleProcessorTestController controller{std::make_unique<processors::FetchOPCProcessor>("FetchOPCProcessor")}; + LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>(); + auto fetch_opc_processor = controller.getProcessor(); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name, "Path")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name, "Simulator/Default/Device1")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name, std::to_string(server.getNamespaceIndex()))); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name, "New Value")); + + auto results = controller.trigger(); + REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4); + + server.updateNodeTimestamp("Simulator/Default/Device1/INT3"); + results = controller.trigger(); + REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::FetchOPCProcessor::Success).empty()); + REQUIRE(LogTestController::getInstance().contains("Node Simulator/Default/Device1/INT3 has no new value, skipping")); +} + +TEST_CASE("Test fetching new values using lazy new value mode", "[fetchopcprocessor]") { + OpcUaTestServer server(4841); + server.start(); + SingleProcessorTestController controller{std::make_unique<processors::FetchOPCProcessor>("FetchOPCProcessor")}; + LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>(); + auto fetch_opc_processor = controller.getProcessor(); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name, "opc.tcp://127.0.0.1:4841/")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name, "Path")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name, "Simulator/Default/Device1")); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name, std::to_string(server.getNamespaceIndex()))); + REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name, "New Value")); + + auto results = controller.trigger(); + REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4); + + server.updateNodeTimestamp("Simulator/Default/Device1/INT3"); + server.updateNodeValue("Simulator/Default/Device1/INT2", 42); + results = controller.trigger(); + REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty()); + REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 1); + auto flow_file = results.at(processors::FetchOPCProcessor::Success)[0]; + CHECK(flow_file->getAttribute("Browsename") == "INT2"); +} + } // namespace org::apache::nifi::minifi::test diff --git a/extensions/opc/tests/OpcUaTestServer.h b/extensions/opc/tests/OpcUaTestServer.h index 71e7cd6ba..5e955cdde 100644 --- a/extensions/opc/tests/OpcUaTestServer.h +++ b/extensions/opc/tests/OpcUaTestServer.h @@ -63,10 +63,14 @@ class OpcUaTestServer { UA_NodeId default_node = addObject("Default", simulator_node); UA_NodeId device1_node = addObject("Device1", default_node); - addIntVariable("INT1", device1_node, 1); - addIntVariable("INT2", device1_node, 2); + auto int1_node = addIntVariable("INT1", device1_node, 1); + node_ids_["Simulator/Default/Device1/INT1"] = int1_node; + auto int2_node = addIntVariable("INT2", device1_node, 2); + node_ids_["Simulator/Default/Device1/INT2"] = int2_node; auto int3_node = addIntVariable("INT3", device1_node, 3); - addIntVariable("INT4", int3_node, 4); + node_ids_["Simulator/Default/Device1/INT3"] = int3_node; + auto int4_node = addIntVariable("INT4", int3_node, 4); + node_ids_["Simulator/Default/Device1/INT4"] = int4_node; } void start() { @@ -108,6 +112,25 @@ class OpcUaTestServer { return server_logs_; } + void updateNodeTimestamp(const std::string& full_path) { + UA_Int32 new_value = full_path[full_path.size() - 1] - '0'; + updateNodeValue(full_path, new_value); + } + + void updateNodeValue(const std::string& full_path, int32_t new_value) { + std::lock_guard<std::mutex> lock(mutex_); + + UA_Variant variant; + UA_Variant_init(&variant); + UA_Variant_setScalar(&variant, &new_value, &UA_TYPES[UA_TYPES_INT32]); + + UA_StatusCode status = UA_Server_writeValue(server_, node_ids_[full_path], variant); + + if (status != UA_STATUSCODE_GOOD) { + throw std::runtime_error("Failed to write value to node"); + } + } + private: UA_NodeId addObject(const char *name, UA_NodeId parent) { UA_NodeId object_id; @@ -171,6 +194,7 @@ class OpcUaTestServer { std::thread server_thread_; mutable std::mutex server_logs_mutex_; std::vector<std::string> server_logs_; + std::unordered_map<std::string, UA_NodeId> node_ids_; }; } // namespace org::apache::nifi::minifi::test
