This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit ef701211b124692dbe1413e960d2422dd580daf5
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Mon Jul 21 13:41:09 2025 +0200

    MINIFICPP-2591 Add new value option to FetchOPCProcessor lazy mode
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    Closes #1994
---
 PROCESSORS.md                                   |  30 +++----
 extensions/opc/include/fetchopc.h               |  46 ++++++++---
 extensions/opc/include/putopc.h                 |   2 +-
 extensions/opc/src/fetchopc.cpp                 | 101 +++++++++++++++++++-----
 extensions/opc/tests/FetchOpcProcessorTests.cpp |  94 ++++++++++++++++++++++
 extensions/opc/tests/OpcUaTestServer.h          |  30 ++++++-
 6 files changed, 256 insertions(+), 47 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 7bb2e948a..bec6d178f 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -920,21 +920,21 @@ Fetches OPC-UA node
 
 In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
 
-| Name                            | Default Value | Allowable Values        | 
Description                                                                     
                                                                                
                                                                                
                                                                   |
-|---------------------------------|---------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| **OPC server endpoint**         |               |                         | 
Specifies the address, port and relative path of an OPC endpoint                
                                                                                
                                                                                
                                                                   |
-| Application URI                 |               |                         | 
Application URI of the client in the format 'urn:unconfigured:application'. 
Mandatory, if using Secure Channel and must match the URI included in the 
certificate's Subject Alternative Names.                                        
                                                                             |
-| Username                        |               |                         | 
Username to log in with.                                                        
                                                                                
                                                                                
                                                                   |
-| Password                        |               |                         | 
Password to log in with.<br/>**Sensitive Property: true**                       
                                                                                
                                                                                
                                                                   |
-| Certificate path                |               |                         | 
Path to the DER-encoded cert file                                               
                                                                                
                                                                                
                                                                   |
-| Key path                        |               |                         | 
Path to the DER-encoded key file                                                
                                                                                
                                                                                
                                                                   |
-| Trusted server certificate path |               |                         | 
Comma separated list of paths to the DER-encoded trusted server certificates    
                                                                                
                                                                                
                                                                   |
-| Path reference types            |               |                         | 
Specify the reference types between nodes in the path if Path Node ID type is 
used. If not provided, all reference types are assumed to be Organizes. The 
format is 'referenceType1/referenceType2/.../referenceTypeN' and the supported 
reference types are Organizes, HasComponent, HasProperty, and HasSubtype. |
-| **Node ID type**                |               | Path<br/>Int<br/>String | 
Specifies the type of the provided node ID                                      
                                                                                
                                                                                
                                                                   |
-| **Node ID**                     |               |                         | 
Specifies the ID of the root node to traverse. In case of a Path Node ID Type, 
the path should be provided in the format of 'path/to/node'.                    
                                                                                
                                                                    |
-| **Namespace index**             | 0             |                         | 
The index of the namespace.                                                     
                                                                                
                                                                                
                                                                   |
-| **Max depth**                   | 0             |                         | 
Specifiec the max depth of browsing. 0 means unlimited.                         
                                                                                
                                                                                
                                                                   |
-| **Lazy mode**                   | Off           | On<br/>Off              | 
Only creates flowfiles from nodes with new timestamp from the server.           
                                                                                
                                                                                
                                                                   |
+| Name                            | Default Value | Allowable Values         | 
Description                                                                     
                                                                                
                                                                                
                                                                   |
+|---------------------------------|---------------|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **OPC server endpoint**         |               |                          | 
Specifies the address, port and relative path of an OPC endpoint                
                                                                                
                                                                                
                                                                   |
+| Application URI                 |               |                          | 
Application URI of the client in the format 'urn:unconfigured:application'. 
Mandatory, if using Secure Channel and must match the URI included in the 
certificate's Subject Alternative Names.                                        
                                                                             |
+| Username                        |               |                          | 
Username to log in with.                                                        
                                                                                
                                                                                
                                                                   |
+| Password                        |               |                          | 
Password to log in with.<br/>**Sensitive Property: true**                       
                                                                                
                                                                                
                                                                   |
+| Certificate path                |               |                          | 
Path to the DER-encoded cert file                                               
                                                                                
                                                                                
                                                                   |
+| Key path                        |               |                          | 
Path to the DER-encoded key file                                                
                                                                                
                                                                                
                                                                   |
+| Trusted server certificate path |               |                          | 
Comma separated list of paths to the DER-encoded trusted server certificates    
                                                                                
                                                                                
                                                                   |
+| Path reference types            |               |                          | 
Specify the reference types between nodes in the path if Path Node ID type is 
used. If not provided, all reference types are assumed to be Organizes. The 
format is 'referenceType1/referenceType2/.../referenceTypeN' and the supported 
reference types are Organizes, HasComponent, HasProperty, and HasSubtype. |
+| **Node ID type**                |               | Path<br/>Int<br/>String  | 
Specifies the type of the provided node ID                                      
                                                                                
                                                                                
                                                                   |
+| **Node ID**                     |               |                          | 
Specifies the ID of the root node to traverse. In case of a Path Node ID Type, 
the path should be provided in the format of 'path/to/node'.                    
                                                                                
                                                                    |
+| **Namespace index**             | 0             |                          | 
The index of the namespace.                                                     
                                                                                
                                                                                
                                                                   |
+| **Max depth**                   | 0             |                          | 
Specifiec the max depth of browsing. 0 means unlimited.                         
                                                                                
                                                                                
                                                                   |
+| **Lazy mode**                   | Off           | On<br/>New Value<br/>Off | 
Only creates flowfiles from nodes with new timestamp from the server. If set to 
'New Value', it will only create flowfiles if the value of the node data has 
changed since the last fetch, the timestamp is ignored.                         
                                                                      |
 
 ### Relationships
 
diff --git a/extensions/opc/include/fetchopc.h 
b/extensions/opc/include/fetchopc.h
index 012935df4..ada1522f8 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -36,15 +36,39 @@
 #include "utils/ArrayUtils.h"
 #include "utils/Id.h"
 #include "utils/gsl.h"
+#include "minifi-cpp/core/StateManager.h"
 
 namespace org::apache::nifi::minifi::processors {
 
 enum class LazyModeOptions {
   On,
+  NewValue,
   Off
 };
 
-class FetchOPCProcessor : public BaseOPCProcessor {
+}  // namespace org::apache::nifi::minifi::processors
+
+namespace magic_enum::customize {
+
+using LazyModeOptions = org::apache::nifi::minifi::processors::LazyModeOptions;
+
+template <>
+constexpr customize_t enum_name<LazyModeOptions>(LazyModeOptions value) 
noexcept {
+  switch (value) {
+    case LazyModeOptions::On:
+      return "On";
+    case LazyModeOptions::NewValue:
+      return "New Value";
+    case LazyModeOptions::Off:
+      return "Off";
+  }
+  return invalid_tag;
+}
+}  // namespace magic_enum::customize
+
+namespace org::apache::nifi::minifi::processors {
+
+class FetchOPCProcessor final : public BaseOPCProcessor {
  public:
   explicit FetchOPCProcessor(std::string_view name, const utils::Identifier& 
uuid = {})
       : BaseOPCProcessor(name, uuid) {
@@ -75,7 +99,8 @@ class FetchOPCProcessor : public BaseOPCProcessor {
       .isRequired(true)
       .build();
   EXTENSIONAPI static constexpr auto Lazy = 
core::PropertyDefinitionBuilder<magic_enum::enum_count<LazyModeOptions>()>::createProperty("Lazy
 mode")
-      .withDescription("Only creates flowfiles from nodes with new timestamp 
from the server.")
+      .withDescription("Only creates flowfiles from nodes with new timestamp 
from the server. If set to 'New Value', it will only create flowfiles "
+                       "if the value of the node data has changed since the 
last fetch, the timestamp is ignored.")
       .isRequired(true)
       .withAllowedValues(magic_enum::enum_names<LazyModeOptions>())
       .withDefaultValue(magic_enum::enum_name(LazyModeOptions::Off))
@@ -116,19 +141,20 @@ class FetchOPCProcessor : public BaseOPCProcessor {
   void onTrigger(core::ProcessContext& context, core::ProcessSession& session) 
override;
   void initialize() override;
 
- protected:
+ private:
   bool nodeFoundCallBack(const UA_ReferenceDescription *ref, const 
std::string& path,
                          core::ProcessContext& context, core::ProcessSession& 
session,
-                         size_t& nodes_found, size_t& variables_found);
-
-  void OPCData2FlowFile(const opc::NodeData& opc_node, core::ProcessContext& 
context, core::ProcessSession& session);
+                         size_t& nodes_found, size_t& variables_found, 
std::unordered_map<std::string, std::string>& state_map);
+  void OPCData2FlowFile(const opc::NodeData& opc_node, core::ProcessContext& 
context, core::ProcessSession& session) const;
+  void writeFlowFileUsingLazyModeWithTimestamp(const opc::NodeData& nodedata, 
core::ProcessContext& context, core::ProcessSession& session, size_t& 
variables_found,
+    std::unordered_map<std::string, std::string>& state_map) const;
+  void writeFlowFileUsingLazyModeWithNewValue(const opc::NodeData& nodedata, 
core::ProcessContext& context, core::ProcessSession& session, size_t& 
variables_found,
+    std::unordered_map<std::string, std::string>& state_map) const;
 
   uint64_t max_depth_ = 0;
-  bool lazy_mode_ = false;
-
- private:
+  LazyModeOptions lazy_mode_ = LazyModeOptions::Off;
   std::vector<UA_NodeId> translated_node_ids_;  // Only used when user 
provides path, path->nodeid translation is only done once
-  std::unordered_map<std::string, std::string> node_timestamp_;  // Key = Full 
path, Value = Timestamp
+  core::StateManager* state_manager_ = nullptr;
 };
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h
index 92f4f3256..cf29c6e47 100644
--- a/extensions/opc/include/putopc.h
+++ b/extensions/opc/include/putopc.h
@@ -38,7 +38,7 @@
 
 namespace org::apache::nifi::minifi::processors {
 
-class PutOPCProcessor : public BaseOPCProcessor {
+class PutOPCProcessor final : public BaseOPCProcessor {
  public:
   EXTENSIONAPI static constexpr const char* Description = "Creates/updates OPC 
nodes";
 
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
index 1d917082b..7ac232ac6 100644
--- a/extensions/opc/src/fetchopc.cpp
+++ b/extensions/opc/src/fetchopc.cpp
@@ -40,6 +40,11 @@ void FetchOPCProcessor::initialize() {
 void FetchOPCProcessor::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory& factory) {
   logger_->log_trace("FetchOPCProcessor::onSchedule");
 
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
   translated_node_ids_.clear();  // Path might has changed during restart
 
   BaseOPCProcessor::onSchedule(context, factory);
@@ -51,7 +56,7 @@ void FetchOPCProcessor::onSchedule(core::ProcessContext& 
context, core::ProcessS
 
   namespace_idx_ = gsl::narrow<int32_t>(utils::parseI64Property(context, 
NameSpaceIndex));
 
-  lazy_mode_ = utils::parseEnumProperty<LazyModeOptions>(context, Lazy) == 
LazyModeOptions::On;
+  lazy_mode_ = utils::parseEnumProperty<LazyModeOptions>(context, Lazy);
 
   if (id_type_ == opc::OPCNodeIDType::Path) {
     readPathReferenceTypes(context, node_id_);
@@ -69,8 +74,12 @@ void FetchOPCProcessor::onTrigger(core::ProcessContext& 
context, core::ProcessSe
   size_t nodes_found = 0;
   size_t variables_found = 0;
 
-  auto found_cb = [this, &context, &session, &nodes_found, 
&variables_found](const UA_ReferenceDescription* ref, const std::string& path) {
-    return nodeFoundCallBack(ref, path, context, session, nodes_found, 
variables_found); };
+  std::unordered_map<std::string, std::string> state_map;
+  state_manager_->get(state_map);
+
+  auto found_cb = [this, &context, &session, &nodes_found, &variables_found, 
&state_map](const UA_ReferenceDescription* ref, const std::string& path) {
+    return nodeFoundCallBack(ref, path, context, session, nodes_found, 
variables_found, state_map);
+  };
 
   if (id_type_ != opc::OPCNodeIDType::Path) {
     UA_NodeId my_id;
@@ -107,29 +116,85 @@ void FetchOPCProcessor::onTrigger(core::ProcessContext& 
context, core::ProcessSe
     logger_->log_warn("Found no variables when traversing the specified node. 
No flowfiles are generated. Yielding...");
     yield();
   }
+
+  state_manager_->set(state_map);
+}
+
+void FetchOPCProcessor::writeFlowFileUsingLazyModeWithTimestamp(const 
opc::NodeData& nodedata, core::ProcessContext& context, core::ProcessSession& 
session, size_t& variables_found,
+    std::unordered_map<std::string, std::string>& state_map) const {
+  auto writeDataToFlowFile = [this, &nodedata, &context, &session, 
&variables_found]() {
+    OPCData2FlowFile(nodedata, context, session);
+    ++variables_found;
+  };
+
+  auto full_path_it = nodedata.attributes.find("Full path");
+  if (full_path_it == nodedata.attributes.end()) {
+    logger_->log_error("Node data does not contain 'Full path' attribute, 
cannot read state for node");
+    writeDataToFlowFile();
+    return;
+  }
+
+  auto source_timestamp_it = nodedata.attributes.find("Sourcetimestamp");
+  if (source_timestamp_it == nodedata.attributes.end()) {
+    logger_->log_error("Node data does not contain 'Sourcetimestamp' 
attribute, cannot read state for node");
+    writeDataToFlowFile();
+    return;
+  }
+
+  auto new_state_value = source_timestamp_it->second;
+  auto nodeid = full_path_it->second + "_timestamp";
+  auto cur_state_value = state_map[nodeid];
+  if (cur_state_value.empty() || cur_state_value != new_state_value) {
+    logger_->log_debug("Node {} has new source timestamp", 
full_path_it->second);
+    state_map[nodeid] = new_state_value;
+    writeDataToFlowFile();
+    return;
+  }
+
+  logger_->log_debug("Node {} has no new source timestamp, skipping", 
full_path_it->second);
+}
+
+void FetchOPCProcessor::writeFlowFileUsingLazyModeWithNewValue(const 
opc::NodeData& nodedata, core::ProcessContext& context, core::ProcessSession& 
session, size_t& variables_found,
+    std::unordered_map<std::string, std::string>& state_map) const {
+  auto writeDataToFlowFile = [this, &nodedata, &context, &session, 
&variables_found]() {
+    OPCData2FlowFile(nodedata, context, session);
+    ++variables_found;
+  };
+
+  auto full_path_it = nodedata.attributes.find("Full path");
+  if (full_path_it == nodedata.attributes.end()) {
+    logger_->log_error("Node data does not contain 'Full path' attribute, 
cannot read state for node");
+    writeDataToFlowFile();
+    return;
+  }
+
+  auto new_state_value = opc::nodeValue2String(nodedata);
+  auto nodeid = full_path_it->second + "_new_value";
+  auto cur_state_value = state_map[nodeid];
+  if (cur_state_value.empty() || cur_state_value != new_state_value) {
+    logger_->log_debug("Node {} has new value", full_path_it->second);
+    state_map[nodeid] = new_state_value;
+    writeDataToFlowFile();
+    return;
+  }
+
+  logger_->log_debug("Node {} has no new value, skipping", 
full_path_it->second);
 }
 
 bool FetchOPCProcessor::nodeFoundCallBack(const UA_ReferenceDescription *ref, 
const std::string& path,
-    core::ProcessContext& context, core::ProcessSession& session, size_t& 
nodes_found, size_t& variables_found) {
+    core::ProcessContext& context, core::ProcessSession& session, size_t& 
nodes_found, size_t& variables_found,
+    std::unordered_map<std::string, std::string>& state_map) {
   ++nodes_found;
   if (ref->nodeClass != UA_NODECLASS_VARIABLE) {
     return true;
   }
   try {
     opc::NodeData nodedata = connection_->getNodeData(ref, path);
-    bool write = true;
-    if (lazy_mode_) {
-      write = false;
-      std::string nodeid = nodedata.attributes["Full path"];
-      std::string cur_timestamp = node_timestamp_[nodeid];
-      std::string new_timestamp = nodedata.attributes["Sourcetimestamp"];
-      if (cur_timestamp != new_timestamp) {
-        node_timestamp_[nodeid] = new_timestamp;
-        logger_->log_debug("Node {} has new source timestamp {}", nodeid, 
new_timestamp);
-        write = true;
-      }
-    }
-    if (write) {
+    if (lazy_mode_ == LazyModeOptions::On) {
+      writeFlowFileUsingLazyModeWithTimestamp(nodedata, context, session, 
variables_found, state_map);
+    } else if (lazy_mode_ == LazyModeOptions::NewValue) {
+      writeFlowFileUsingLazyModeWithNewValue(nodedata, context, session, 
variables_found, state_map);
+    } else {
       OPCData2FlowFile(nodedata, context, session);
       ++variables_found;
     }
@@ -140,7 +205,7 @@ bool FetchOPCProcessor::nodeFoundCallBack(const 
UA_ReferenceDescription *ref, co
   return true;
 }
 
-void FetchOPCProcessor::OPCData2FlowFile(const opc::NodeData& opc_node, 
core::ProcessContext&, core::ProcessSession& session) {
+void FetchOPCProcessor::OPCData2FlowFile(const opc::NodeData& opc_node, 
core::ProcessContext&, core::ProcessSession& session) const {
   auto flow_file = session.create();
   if (flow_file == nullptr) {
     logger_->log_error("Failed to create flowfile!");
diff --git a/extensions/opc/tests/FetchOpcProcessorTests.cpp 
b/extensions/opc/tests/FetchOpcProcessorTests.cpp
index a2b77d6c9..247c3dad1 100644
--- a/extensions/opc/tests/FetchOpcProcessorTests.cpp
+++ b/extensions/opc/tests/FetchOpcProcessorTests.cpp
@@ -179,4 +179,98 @@ TEST_CASE("Test trusted certs path must be valid", 
"[fetchopcprocessor]") {
   REQUIRE_THROWS_WITH(controller.trigger("42"), "Process Schedule Operation: 
Failed to load trusted server certs from path: /invalid/trusted");
 }
 
+TEST_CASE("Test no fetch result using lazy mode when no timestamps are 
changed", "[fetchopcprocessor]") {
+  OpcUaTestServer server(4841);
+  server.start();
+  SingleProcessorTestController 
controller{std::make_unique<processors::FetchOPCProcessor>("FetchOPCProcessor")};
+  LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>();
+  auto fetch_opc_processor = controller.getProcessor();
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name,
 "opc.tcp://127.0.0.1:4841/"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name,
 "Path"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name,
 "Simulator/Default/Device1"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name,
 std::to_string(server.getNamespaceIndex())));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name,
 "On"));
+
+  auto results = controller.trigger();
+  REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
+  REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4);
+
+  results = controller.trigger();
+  REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
+  REQUIRE(results.at(processors::FetchOPCProcessor::Success).empty());
+  REQUIRE(LogTestController::getInstance().contains("Node 
Simulator/Default/Device1/INT3 has no new source timestamp, skipping"));
+}
+
+TEST_CASE("Test fetch for nodes with changed timestamps with lazy mode", 
"[fetchopcprocessor]") {
+  OpcUaTestServer server(4841);
+  server.start();
+  SingleProcessorTestController 
controller{std::make_unique<processors::FetchOPCProcessor>("FetchOPCProcessor")};
+  LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>();
+  auto fetch_opc_processor = controller.getProcessor();
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name,
 "opc.tcp://127.0.0.1:4841/"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name,
 "Path"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name,
 "Simulator/Default/Device1"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name,
 std::to_string(server.getNamespaceIndex())));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name,
 "On"));
+
+  auto results = controller.trigger();
+  REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
+  REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4);
+
+  server.updateNodeTimestamp("Simulator/Default/Device1/INT3");
+  results = controller.trigger();
+  REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
+  REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 1);
+  auto flow_file = results.at(processors::FetchOPCProcessor::Success)[0];
+  CHECK(flow_file->getAttribute("Browsename") == "INT3");
+}
+
+TEST_CASE("Test no fetch result using lazy new value mode when no values are 
changed", "[fetchopcprocessor]") {
+  OpcUaTestServer server(4841);
+  server.start();
+  SingleProcessorTestController 
controller{std::make_unique<processors::FetchOPCProcessor>("FetchOPCProcessor")};
+  LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>();
+  auto fetch_opc_processor = controller.getProcessor();
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name,
 "opc.tcp://127.0.0.1:4841/"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name,
 "Path"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name,
 "Simulator/Default/Device1"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name,
 std::to_string(server.getNamespaceIndex())));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name,
 "New Value"));
+
+  auto results = controller.trigger();
+  REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
+  REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4);
+
+  server.updateNodeTimestamp("Simulator/Default/Device1/INT3");
+  results = controller.trigger();
+  REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
+  REQUIRE(results.at(processors::FetchOPCProcessor::Success).empty());
+  REQUIRE(LogTestController::getInstance().contains("Node 
Simulator/Default/Device1/INT3 has no new value, skipping"));
+}
+
+TEST_CASE("Test fetching new values using lazy new value mode", 
"[fetchopcprocessor]") {
+  OpcUaTestServer server(4841);
+  server.start();
+  SingleProcessorTestController 
controller{std::make_unique<processors::FetchOPCProcessor>("FetchOPCProcessor")};
+  LogTestController::getInstance().setDebug<processors::FetchOPCProcessor>();
+  auto fetch_opc_processor = controller.getProcessor();
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::OPCServerEndPoint.name,
 "opc.tcp://127.0.0.1:4841/"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeIDType.name,
 "Path"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NodeID.name,
 "Simulator/Default/Device1"));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::NameSpaceIndex.name,
 std::to_string(server.getNamespaceIndex())));
+  
REQUIRE(fetch_opc_processor->setProperty(processors::FetchOPCProcessor::Lazy.name,
 "New Value"));
+
+  auto results = controller.trigger();
+  REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
+  REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 4);
+
+  server.updateNodeTimestamp("Simulator/Default/Device1/INT3");
+  server.updateNodeValue("Simulator/Default/Device1/INT2", 42);
+  results = controller.trigger();
+  REQUIRE(results.at(processors::FetchOPCProcessor::Failure).empty());
+  REQUIRE(results.at(processors::FetchOPCProcessor::Success).size() == 1);
+  auto flow_file = results.at(processors::FetchOPCProcessor::Success)[0];
+  CHECK(flow_file->getAttribute("Browsename") == "INT2");
+}
+
 }  // namespace org::apache::nifi::minifi::test
diff --git a/extensions/opc/tests/OpcUaTestServer.h 
b/extensions/opc/tests/OpcUaTestServer.h
index 71e7cd6ba..5e955cdde 100644
--- a/extensions/opc/tests/OpcUaTestServer.h
+++ b/extensions/opc/tests/OpcUaTestServer.h
@@ -63,10 +63,14 @@ class OpcUaTestServer {
     UA_NodeId default_node = addObject("Default", simulator_node);
     UA_NodeId device1_node = addObject("Device1", default_node);
 
-    addIntVariable("INT1", device1_node, 1);
-    addIntVariable("INT2", device1_node, 2);
+    auto int1_node = addIntVariable("INT1", device1_node, 1);
+    node_ids_["Simulator/Default/Device1/INT1"] = int1_node;
+    auto int2_node = addIntVariable("INT2", device1_node, 2);
+    node_ids_["Simulator/Default/Device1/INT2"] = int2_node;
     auto int3_node = addIntVariable("INT3", device1_node, 3);
-    addIntVariable("INT4", int3_node, 4);
+    node_ids_["Simulator/Default/Device1/INT3"] = int3_node;
+    auto int4_node = addIntVariable("INT4", int3_node, 4);
+    node_ids_["Simulator/Default/Device1/INT4"] = int4_node;
   }
 
   void start() {
@@ -108,6 +112,25 @@ class OpcUaTestServer {
     return server_logs_;
   }
 
+  void updateNodeTimestamp(const std::string& full_path) {
+    UA_Int32 new_value = full_path[full_path.size() - 1] - '0';
+    updateNodeValue(full_path, new_value);
+  }
+
+  void updateNodeValue(const std::string& full_path, int32_t new_value) {
+    std::lock_guard<std::mutex> lock(mutex_);
+
+    UA_Variant variant;
+    UA_Variant_init(&variant);
+    UA_Variant_setScalar(&variant, &new_value, &UA_TYPES[UA_TYPES_INT32]);
+
+    UA_StatusCode status = UA_Server_writeValue(server_, node_ids_[full_path], 
variant);
+
+    if (status != UA_STATUSCODE_GOOD) {
+      throw std::runtime_error("Failed to write value to node");
+    }
+  }
+
  private:
   UA_NodeId addObject(const char *name, UA_NodeId parent) {
     UA_NodeId object_id;
@@ -171,6 +194,7 @@ class OpcUaTestServer {
   std::thread server_thread_;
   mutable std::mutex server_logs_mutex_;
   std::vector<std::string> server_logs_;
+  std::unordered_map<std::string, UA_NodeId> node_ids_;
 };
 
 }  // namespace org::apache::nifi::minifi::test

Reply via email to