This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new f0a96c9 MINIFICPP-819 - OPC Unified Architecture Support
f0a96c9 is described below
commit f0a96c99afd16a6686466d5edea3d59fda67d1fe
Author: Arpad Boda <[email protected]>
AuthorDate: Wed Jul 10 11:43:44 2019 +0200
MINIFICPP-819 - OPC Unified Architecture Support
Signed-off-by: Arpad Boda <[email protected]>
Approved by bakaid on GH
This closes #635
---
CMakeLists.txt | 12 +
bootstrap.sh | 18 +-
bstrp_functions.sh | 2 +
cmake/BundledMbedTLS.cmake | 104 +++++++
cmake/BundledOpen62541.cmake | 83 +++++
cmake/mbedtls/dummy/FindMbedTLS.cmake | 25 ++
extensions/opc/CMakeLists.txt | 59 ++++
extensions/opc/include/fetchopc.h | 109 +++++++
extensions/opc/include/opc.h | 146 +++++++++
extensions/opc/include/opcbase.h | 85 +++++
extensions/opc/include/putopc.h | 111 +++++++
extensions/opc/src/fetchopc.cpp | 235 ++++++++++++++
extensions/opc/src/opc.cpp | 567 ++++++++++++++++++++++++++++++++++
extensions/opc/src/opcbase.cpp | 162 ++++++++++
extensions/opc/src/putopc.cpp | 466 ++++++++++++++++++++++++++++
libminifi/include/core/Property.h | 5 +
rheldistro.sh | 2 +-
suse.sh | 2 +-
thirdparty/open62541/open62541.patch | 42 +++
19 files changed, 2225 insertions(+), 10 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ec02734..4d26dd0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -40,6 +40,7 @@ option(ENABLE_OPS "Enable Operations/zlib Tools" ON)
option(USE_SYSTEM_UUID "Instructs the build system to search for and use a
UUID library available in the host system" OFF)
option(ENABLE_JNI "Instructs the build system to enable the JNI extension" OFF)
option(ENABLE_OPENCV "Instructs the build system to enable the OpenCV
extension" OFF)
+option(ENABLE_OPC "Instructs the build system to enable the OPC extension" OFF)
option(BUILD_SHARED_LIBS "Build yaml cpp shared lib" OFF)
cmake_dependent_option(USE_SYSTEM_ZLIB "Instructs the build system to search
for and use a zlib library available in the host system" ON "NOT STATIC_BUILD"
OFF)
@@ -560,6 +561,17 @@ if (ENABLE_BUSTACHE)
createExtension(BUSTACHE-EXTENSIONS "BUSTACHE EXTENSIONS" "This enables
bustache functionality including ApplyTemplate." "extensions/bustache"
"${TEST_DIR}/bustache-tests" "TRUE" "thirdparty/bustache")
endif()
+## OPC Extentions
+if (ENABLE_OPC)
+ include(BundledMbedTLS)
+ use_bundled_mbedtls(${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR})
+
+ include(BundledOpen62541)
+ use_bundled_open62541(${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR})
+
+ createExtension(OPC-EXTENSIONS "OPC EXTENSIONS" "This enables OPC-UA
support" "extensions/opc" )
+endif()
+
## SFTP extensions
option(ENABLE_SFTP "Enables SFTP support." OFF)
if ((ENABLE_ALL OR ENABLE_SFTP) AND NOT DISABLE_CURL)
diff --git a/bootstrap.sh b/bootstrap.sh
index 8d8fc77..0224490 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -290,23 +290,25 @@ add_disabled_option OPENCV_ENABLED ${FALSE}
"ENABLE_OPENCV"
add_disabled_option SFTP_ENABLED ${FALSE} "ENABLE_SFTP"
add_dependency SFTP_ENABLED "libssh2"
-TESTS_DISABLED=${FALSE}
-
add_disabled_option SQLITE_ENABLED ${FALSE} "ENABLE_SQLITE"
-USE_SHARED_LIBS=${TRUE}
-
-## name, default, values
-add_multi_option BUILD_PROFILE "RelWithDebInfo" "RelWithDebInfo" "Debug"
"MinSizeRel" "Release"
-
# Since the following extensions have limitations on
-
add_disabled_option BUSTACHE_ENABLED ${FALSE} "ENABLE_BUSTACHE" "2.6" ${TRUE}
add_dependency BUSTACHE_ENABLED "boost"
+
## currently need to limit on certain platforms
add_disabled_option TENSORFLOW_ENABLED ${FALSE} "ENABLE_TENSORFLOW" "2.6"
${TRUE}
add_dependency TENSORFLOW_ENABLED "tensorflow"
+add_disabled_option OPC_ENABLED ${FALSE} "ENABLE_OPC"
+add_dependency OPC_ENABLED "mbedtls"
+
+USE_SHARED_LIBS=${TRUE}
+TESTS_DISABLED=${FALSE}
+
+## name, default, values
+add_multi_option BUILD_PROFILE "RelWithDebInfo" "RelWithDebInfo" "Debug"
"MinSizeRel" "Release"
+
if [ "$GUIDED_INSTALL" == "${TRUE}" ]; then
EnableAllFeatures
ALL_FEATURES_ENABLED=${TRUE}
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index 4f34fc9..74f079a 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -325,6 +325,7 @@ show_supported_features() {
echo "S. SFTP Support ................$(print_feature_status SFTP_ENABLED)"
echo "V. AWS Support .................$(print_feature_status AWS_ENABLED)"
echo "T. OpenCV Support ..............$(print_feature_status OPENCV_ENABLED)"
+ echo "U. OPC-UA Support...............$(print_feature_status OPC_ENABLED)"
echo "****************************************"
echo " Build Options."
echo "****************************************"
@@ -370,6 +371,7 @@ read_feature_options(){
o) ToggleFeature COAP_ENABLED ;;
s) ToggleFeature SFTP_ENABLED ;;
t) ToggleFeature OPENCV_ENABLED ;;
+ u) ToggleFeature OPC_ENABLED ;;
1) ToggleFeature TESTS_DISABLED ;;
2) EnableAllFeatures ;;
3) ToggleFeature JNI_ENABLED;;
diff --git a/cmake/BundledMbedTLS.cmake b/cmake/BundledMbedTLS.cmake
new file mode 100644
index 0000000..9407aed
--- /dev/null
+++ b/cmake/BundledMbedTLS.cmake
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+function(use_bundled_mbedtls SOURCE_DIR BINARY_DIR)
+ message("Using bundled MbedTLS")
+
+ # Define byproducts
+ if (WIN32)
+ set(BYPRODUCT_PREFIX "" CACHE STRING "" FORCE)
+ set(BYPRODUCT_SUFFIX ".lib" CACHE STRING "" FORCE)
+ else()
+ set(BYPRODUCT_PREFIX "lib" CACHE STRING "" FORCE)
+ set(BYPRODUCT_SUFFIX ".a" CACHE STRING "" FORCE)
+ endif()
+
+ set(BYPRODUCTS
+ "lib/${BYPRODUCT_PREFIX}mbedtls${BYPRODUCT_SUFFIX}"
+ "lib/${BYPRODUCT_PREFIX}mbedx509${BYPRODUCT_SUFFIX}"
+ "lib/${BYPRODUCT_PREFIX}mbedcrypto${BYPRODUCT_SUFFIX}"
+ )
+
+ set(MBEDTLS_BIN_DIR "${BINARY_DIR}/thirdparty/mbedtls-install" CACHE
STRING "" FORCE)
+
+ FOREACH(BYPRODUCT ${BYPRODUCTS})
+ LIST(APPEND MBEDTLS_LIBRARIES_LIST "${MBEDTLS_BIN_DIR}/${BYPRODUCT}")
+ ENDFOREACH(BYPRODUCT)
+
+ # Set build options
+ set(MBEDTLS_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
+ "-DCMAKE_INSTALL_PREFIX=${MBEDTLS_BIN_DIR}"
+ -DENABLE_PROGRAMS=OFF
+ -DENABLE_TESTING=OFF
+ )
+
+ # Build project
+ ExternalProject_Add(
+ mbedtls-external
+ URL "https://tls.mbed.org/download/mbedtls-2.16.3-apache.tgz"
+ URL_HASH
"SHA256=ec1bee6d82090ed6ea2690784ea4b294ab576a65d428da9fe8750f932d2da661"
+ SOURCE_DIR "${BINARY_DIR}/thirdparty/mbedtls-src"
+ CMAKE_ARGS ${MBEDTLS_CMAKE_ARGS}
+ BUILD_BYPRODUCTS ${MBEDTLS_LIBRARIES_LIST}
+ EXCLUDE_FROM_ALL TRUE
+ )
+
+ # Set variables
+ set(MBEDTLS_FOUND "YES" CACHE STRING "" FORCE)
+ set(MBEDTLS_INCLUDE_DIRS "${MBEDTLS_BIN_DIR}/include" CACHE STRING ""
FORCE)
+ set(MBEDTLS_LIBRARIES ${MBEDTLS_LIBRARIES_LIST} CACHE STRING "" FORCE)
+ set(MBEDTLS_LIBRARY
"${MBEDTLS_BIN_DIR}/lib/${BYPRODUCT_PREFIX}mbedtls${BYPRODUCT_SUFFIX}" CACHE
STRING "" FORCE)
+ set(MBEDX509_LIBRARY
"${MBEDTLS_BIN_DIR}/lib/${BYPRODUCT_PREFIX}mbedx509${BYPRODUCT_SUFFIX}" CACHE
STRING "" FORCE)
+ set(MBEDCRYPTO_LIBRARY
"${MBEDTLS_BIN_DIR}/lib/${BYPRODUCT_PREFIX}mbedcrypto${BYPRODUCT_SUFFIX}" CACHE
STRING "" FORCE)
+
+ # Set exported variables for FindPackage.cmake
+ set(EXPORTED_MBEDTLS_INCLUDE_DIRS "${MBEDTLS_INCLUDE_DIRS}" CACHE STRING
"" FORCE)
+ string(REPLACE ";" "%" MBEDTLS_LIBRARIES_EXPORT "${MBEDTLS_LIBRARIES}")
+ set(EXPORTED_MBEDTLS_LIBRARIES "${MBEDTLS_LIBRARIES_EXPORT}" CACHE STRING
"" FORCE)
+ set(EXPORTED_MBEDTLS_LIBRARY "${MBEDTLS_LIBRARY}" CACHE STRING "" FORCE)
+ set(EXPORTED_MBEDX509_LIBRARY "${MBEDX509_LIBRARY}" CACHE STRING "" FORCE)
+ set(EXPORTED_MBEDCRYPTO_LIBRARY "${MBEDCRYPTO_LIBRARY}" CACHE STRING ""
FORCE)
+
+ # Create imported targets
+ file(MAKE_DIRECTORY ${MBEDTLS_INCLUDE_DIRS})
+
+ add_library(mbedTLS::mbedcrypto STATIC IMPORTED)
+ set_target_properties(mbedTLS::mbedcrypto PROPERTIES
+ INTERFACE_INCLUDE_DIRECTORIES "${MBEDTLS_INCLUDE_DIRS}")
+ set_target_properties(mbedTLS::mbedcrypto PROPERTIES
+ IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+ IMPORTED_LOCATION "${MBEDCRYPTO_LIBRARY}")
+ add_dependencies(mbedTLS::mbedcrypto mbedtls-external)
+
+ add_library(mbedTLS::mbedx509 STATIC IMPORTED)
+ set_target_properties(mbedTLS::mbedx509 PROPERTIES
+ INTERFACE_INCLUDE_DIRECTORIES "${MBEDTLS_INCLUDE_DIRS}")
+ set_target_properties(mbedTLS::mbedx509 PROPERTIES
+ IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+ IMPORTED_LOCATION "${MBEDX509_LIBRARY}")
+ add_dependencies(mbedTLS::mbedx509 mbedtls-external)
+ set_property(TARGET mbedTLS::mbedx509 APPEND PROPERTY
INTERFACE_LINK_LIBRARIES mbedTLS::mbedcrypto)
+
+ add_library(mbedTLS::mbedtls STATIC IMPORTED)
+ set_target_properties(mbedTLS::mbedtls PROPERTIES
+ INTERFACE_INCLUDE_DIRECTORIES "${MBEDTLS_INCLUDE_DIRS}")
+ set_target_properties(mbedTLS::mbedtls PROPERTIES
+ IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+ IMPORTED_LOCATION "${MBEDTLS_LIBRARY}")
+ add_dependencies(mbedTLS::mbedtls mbedtls-external)
+ set_property(TARGET mbedTLS::mbedtls APPEND PROPERTY
INTERFACE_LINK_LIBRARIES mbedTLS::mbedx509 mbedTLS::mbedcrypto)
+endfunction(use_bundled_mbedtls)
\ No newline at end of file
diff --git a/cmake/BundledOpen62541.cmake b/cmake/BundledOpen62541.cmake
new file mode 100644
index 0000000..b4fb2de
--- /dev/null
+++ b/cmake/BundledOpen62541.cmake
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+function(use_bundled_open62541 SOURCE_DIR BINARY_DIR)
+ # Find patch executable
+ find_package(Patch REQUIRED)
+
+ # Define patch step
+ set(PC "${Patch_EXECUTABLE}" -p1 -i
"${SOURCE_DIR}/thirdparty/open62541/open62541.patch")
+
+ # Define byproducts
+ get_property(LIB64 GLOBAL PROPERTY FIND_LIBRARY_USE_LIB64_PATHS)
+
+ if ("${LIB64}" STREQUAL "TRUE" AND (NOT WIN32 AND NOT APPLE))
+ set(LIBSUFFIX 64)
+ endif()
+
+ if (WIN32)
+ set(BYPRODUCT "lib/open62541.lib")
+ else()
+ set(BYPRODUCT "lib${LIBSUFFIX}/libopen62541.a")
+ endif()
+
+ # Set build options
+ set(OPEN62541_BYPRODUCT_DIR "${BINARY_DIR}/thirdparty/open62541-install")
+
+ set(OPEN62541_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
+ "-DCMAKE_INSTALL_PREFIX=${OPEN62541_BYPRODUCT_DIR}"
+ -DOPEN62541_VERSION=v1.0
+ -DUA_ENABLE_ENCRYPTION=ON
+ "-DCMAKE_MODULE_PATH=${CMAKE_SOURCE_DIR}/cmake/mbedtls/dummy"
+ "-DEXPORTED_MBEDTLS_INCLUDE_DIRS=${EXPORTED_MBEDTLS_INCLUDE_DIRS}"
+ "-DEXPORTED_MBEDTLS_LIBRARIES=${EXPORTED_MBEDTLS_LIBRARIES}"
+ "-DEXPORTED_MBEDTLS_LIBRARY=${EXPORTED_MBEDTLS_LIBRARY}"
+ "-DEXPORTED_MBEDX509_LIBRARY=${EXPORTED_MBEDX509_LIBRARY}"
+ "-DEXPORTED_MBEDCRYPTO_LIBRARY=${EXPORTED_MBEDCRYPTO_LIBRARY}")
+
+ # Build project
+ ExternalProject_Add(
+ open62541-external
+ URL "https://github.com/open62541/open62541/archive/v1.0.tar.gz"
+ URL_HASH
"SHA256=9be66efefe2cdb07a7638aad91c301b5c6163f99c66995bc41cce31ec0ea207e"
+ SOURCE_DIR "${BINARY_DIR}/thirdparty/open62541-src"
+ PATCH_COMMAND ${PC}
+ LIST_SEPARATOR % # This is needed for passing semicolon-separated
lists
+ CMAKE_ARGS ${OPEN62541_CMAKE_ARGS}
+ BUILD_BYPRODUCTS "${OPEN62541_BYPRODUCT_DIR}/${BYPRODUCT}"
+ EXCLUDE_FROM_ALL TRUE
+ )
+
+ # Set dependencies
+ add_dependencies(open62541-external mbedTLS::mbedtls)
+
+ # Set variables
+ set(OPEN62541_FOUND "YES" CACHE STRING "" FORCE)
+ set(OPEN62541_INCLUDE_DIR "${OPEN62541_BYPRODUCT_DIR}/include" CACHE
STRING "" FORCE)
+ set(OPEN62541_LIBRARY "${OPEN62541_BYPRODUCT_DIR}/${BYPRODUCT}" CACHE
STRING "" FORCE)
+
+ # Create imported targets
+ add_library(open62541::open62541 STATIC IMPORTED)
+ set_target_properties(open62541::open62541 PROPERTIES IMPORTED_LOCATION
"${OPEN62541_LIBRARY}")
+ add_dependencies(open62541::open62541 open62541-external)
+ file(MAKE_DIRECTORY ${OPEN62541_INCLUDE_DIR})
+ set_property(TARGET open62541::open62541 APPEND PROPERTY
INTERFACE_INCLUDE_DIRECTORIES ${OPEN62541_INCLUDE_DIR})
+ set_property(TARGET open62541::open62541 APPEND PROPERTY
INTERFACE_LINK_LIBRARIES mbedTLS::mbedtls)
+ if(WIN32)
+ set_property(TARGET open62541::open62541 APPEND PROPERTY
INTERFACE_LINK_LIBRARIES Ws2_32.lib Iphlpapi.lib)
+ endif()
+endfunction(use_bundled_open62541)
\ No newline at end of file
diff --git a/cmake/mbedtls/dummy/FindMbedTLS.cmake
b/cmake/mbedtls/dummy/FindMbedTLS.cmake
new file mode 100644
index 0000000..750d465
--- /dev/null
+++ b/cmake/mbedtls/dummy/FindMbedTLS.cmake
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if(NOT MBEDTLS_FOUND)
+ set(MBEDTLS_FOUND "YES" CACHE STRING "" FORCE)
+ set(MBEDTLS_INCLUDE_DIRS "${EXPORTED_MBEDTLS_INCLUDE_DIRS}" CACHE STRING
"" FORCE)
+ set(MBEDTLS_LIBRARIES ${EXPORTED_MBEDTLS_LIBRARIES} CACHE STRING "" FORCE)
+ set(MBEDTLS_LIBRARY "${EXPORTED_MBEDTLS_LIBRARY}" CACHE STRING "" FORCE)
+ set(MBEDX509_LIBRARY "${EXPORTED_MBEDX509_LIBRARY}" CACHE STRING "" FORCE)
+ set(MBEDCRYPTO_LIBRARY "${EXPORTED_MBEDCRYPTO_LIBRARY}" CACHE STRING ""
FORCE)
+endif()
\ No newline at end of file
diff --git a/extensions/opc/CMakeLists.txt b/extensions/opc/CMakeLists.txt
new file mode 100644
index 0000000..9a77ca1
--- /dev/null
+++ b/extensions/opc/CMakeLists.txt
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+set( CMAKE_VERBOSE_MAKEFILE on )
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+include_directories(include)
+
+add_definitions(-DUA_ENABLE_ENCRYPTION)
+
+file(GLOB SOURCES "src/*.cpp")
+
+add_library(minifi-opc-extensions STATIC ${SOURCES})
+set_property(TARGET minifi-opc-extensions PROPERTY POSITION_INDEPENDENT_CODE
ON)
+
+if(THREADS_HAVE_PTHREAD_ARG)
+ target_compile_options(PUBLIC minifi-opc-extensions "-pthread")
+endif()
+if(CMAKE_THREAD_LIBS_INIT)
+ target_link_libraries(minifi-opc-extensions "${CMAKE_THREAD_LIBS_INIT}")
+endif()
+
+target_link_libraries(minifi-opc-extensions ${CMAKE_DL_LIBS}
open62541::open62541)
+
+if (WIN32)
+ set_target_properties(minifi-opc-extensions PROPERTIES
+ LINK_FLAGS "/WHOLEARCHIVE"
+ )
+elseif (APPLE)
+ set_target_properties(minifi-opc-extensions PROPERTIES
+ LINK_FLAGS "-Wl,-all_load"
+ )
+else ()
+ set_target_properties(minifi-opc-extensions PROPERTIES
+ LINK_FLAGS "-Wl,--whole-archive"
+ )
+endif ()
+
+
+SET (OPC-EXTENSIONS minifi-opc-extensions PARENT_SCOPE)
+
+register_extension(minifi-opc-extensions)
diff --git a/extensions/opc/include/fetchopc.h
b/extensions/opc/include/fetchopc.h
new file mode 100644
index 0000000..dc6fbc3
--- /dev/null
+++ b/extensions/opc/include/fetchopc.h
@@ -0,0 +1,109 @@
+/**
+ * FetchOPC class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef NIFI_MINIFI_CPP_FetchOPCProcessor_H
+#define NIFI_MINIFI_CPP_FetchOPCProcessor_H
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include "opc.h"
+#include "opcbase.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class FetchOPCProcessor : public BaseOPCProcessor {
+public:
+ static constexpr char const* ProcessorName = "FetchOPC";
+ // Supported Properties
+ static core::Property NodeIDType;
+ static core::Property NodeID;
+ static core::Property NameSpaceIndex;
+ static core::Property MaxDepth;
+
+ // Supported Relationships
+ static core::Relationship Success;
+ static core::Relationship Failure;
+
+ FetchOPCProcessor(std::string name, utils::Identifier uuid =
utils::Identifier())
+ : BaseOPCProcessor(name, uuid), nameSpaceIdx_(0), nodesFound_(0),
variablesFound_(0), maxDepth_(0) {
+ logger_ = logging::LoggerFactory<FetchOPCProcessor>::getLogger();
+ }
+
+ virtual void onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
+
+ virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSession> &session) override;
+
+ virtual void initialize(void) override;
+
+protected:
+ bool nodeFoundCallBack(opc::Client& client, const UA_ReferenceDescription
*ref, const std::string& path,
+ const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSession> &session);
+
+ void OPCData2FlowFile(const opc::NodeData& opcnode, const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session);
+
+ class WriteCallback : public OutputStreamCallback {
+ std::string data_;
+ public:
+ WriteCallback(std::string&& data)
+ : data_(data) {
+ }
+ int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ return
stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())),
data_.size());
+ }
+ };
+ std::string nodeID_;
+ int32_t nameSpaceIdx_;
+ opc::OPCNodeIDType idType_;
+ uint32_t nodesFound_;
+ uint32_t variablesFound_;
+ uint64_t maxDepth_;
+
+private:
+ std::mutex onTriggerMutex_;
+ std::vector<UA_NodeId> translatedNodeIDs_; // Only used when user provides
path, path->nodeid translation is only done once
+
+};
+
+REGISTER_RESOURCE(FetchOPCProcessor, "Fetches OPC-UA node");
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_FetchOPCProcessor_H
diff --git a/extensions/opc/include/opc.h b/extensions/opc/include/opc.h
new file mode 100644
index 0000000..0a954de
--- /dev/null
+++ b/extensions/opc/include/opc.h
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef NIFI_MINIFI_CPP_OPC_H
+#define NIFI_MINIFI_CPP_OPC_H
+
+#include "open62541/client.h"
+#include "open62541/client_highlevel.h"
+#include "open62541/client_config_default.h"
+#include "logging/Logger.h"
+#include "Exception.h"
+
+#include <string>
+#include <functional>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace opc {
+
+class OPCException : public minifi::Exception {
+ public:
+ OPCException(ExceptionType type, std::string &&errorMsg)
+ : Exception(type, errorMsg) {
+ }
+};
+
+enum class OPCNodeIDType{ Path, Int, String };
+
+enum class OPCNodeDataType{ Int64, UInt64, Int32, UInt32, Boolean, Float,
Double, String };
+
+struct NodeData;
+
+class Client;
+
+using nodeFoundCallBackFunc = bool(Client& client, const
UA_ReferenceDescription*, const std::string&);
+
+class Client {
+ public:
+ bool isConnected();
+ UA_StatusCode connect(const std::string& url, const std::string& username =
"", const std::string& password = "");
+ ~Client();
+ NodeData getNodeData(const UA_ReferenceDescription *ref, const std::string&
basePath = "");
+ UA_ReferenceDescription * getNodeReference(UA_NodeId nodeId);
+ void traverse(UA_NodeId nodeId, std::function<nodeFoundCallBackFunc> cb,
const std::string& basePath = "", uint64_t maxDepth = 0, bool fetchRoot = true);
+ bool exists(UA_NodeId nodeId);
+ UA_StatusCode translateBrowsePathsToNodeIdsRequest(const std::string& path,
std::vector<UA_NodeId>& foundNodeIDs, const
std::shared_ptr<core::logging::Logger>& logger);
+
+ template<typename T>
+ UA_StatusCode update_node(const UA_NodeId nodeId, T value);
+
+ template<typename T>
+ UA_StatusCode add_node(const UA_NodeId parentNodeId, const UA_NodeId
targetNodeId, std::string browseName, T value, OPCNodeDataType dt, UA_NodeId
*receivedNodeId);
+
+ static std::unique_ptr<Client>
createClient(std::shared_ptr<core::logging::Logger> logger, const std::string&
applicationURI,
+ const std::vector<char>&
certBuffer, const std::vector<char>& keyBuffer,
+ const
std::vector<std::vector<char>>& trustBuffers);
+
+ private:
+ Client (std::shared_ptr<core::logging::Logger> logger, const std::string&
applicationURI,
+ const std::vector<char>& certBuffer, const std::vector<char>& keyBuffer,
+ const std::vector<std::vector<char>>& trustBuffers);
+
+ UA_Client *client_;
+ std::shared_ptr<core::logging::Logger> logger_;
+};
+
+using ClientPtr = std::unique_ptr<Client>;
+
+struct NodeData {
+ std::vector<uint8_t> data;
+ uint16_t dataTypeID;
+ std::map<std::string, std::string> attributes;
+
+ virtual ~NodeData(){
+ if(var_) {
+ UA_Variant_delete(var_);
+ }
+ }
+
+ NodeData (const NodeData&) = delete;
+ NodeData& operator= (const NodeData &) = delete;
+ NodeData& operator= (NodeData &&) = delete;
+
+ NodeData(NodeData&& rhs) : data(rhs.data), attributes(rhs.attributes)
+ {
+ dataTypeID = rhs.dataTypeID;
+ this->var_ = rhs.var_;
+ rhs.var_ = nullptr;
+ }
+
+ private:
+ UA_Variant* var_;
+
+ NodeData(UA_Variant * var = nullptr) {
+ var_ = var;
+ }
+ void addVariant(UA_Variant * var) {
+ if(var_) {
+ UA_Variant_delete(var_);
+ }
+ var_ = var;
+ }
+
+ friend class Client;
+ friend std::string nodeValue2String(const NodeData&);
+};
+
+static std::map<std::string, OPCNodeDataType> StringToOPCDataTypeMap =
{{"Int64", OPCNodeDataType::Int64}, {"UInt64", OPCNodeDataType::UInt64 },
{"Int32", OPCNodeDataType::Int32},
+
{"UInt32", OPCNodeDataType::UInt32}, {"Boolean", OPCNodeDataType::Boolean},
{"Float", OPCNodeDataType::Float},
+
{"Double", OPCNodeDataType::Double}, {"String", OPCNodeDataType::String}};
+
+int32_t OPCNodeDataTypeToTypeID(OPCNodeDataType dt);
+
+std::string nodeValue2String(const NodeData& nd);
+
+std::string OPCDateTime2String(UA_DateTime raw_date);
+
+void logFunc(void *context, UA_LogLevel level, UA_LogCategory category, const
char *msg, va_list args);
+
+static void logClear(void *context) {};
+
+} /* namespace opc */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
+#endif //NIFI_MINIFI_CPP_OPC_H
diff --git a/extensions/opc/include/opcbase.h b/extensions/opc/include/opcbase.h
new file mode 100644
index 0000000..2ad4b03
--- /dev/null
+++ b/extensions/opc/include/opcbase.h
@@ -0,0 +1,85 @@
+/**
+ * OPCBase class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_OPCBASE_H
+#define NIFI_MINIFI_CPP_OPCBASE_H
+
+#include <string>
+
+#include "opc.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class BaseOPCProcessor : public core::Processor {
+ public:
+ static core::Property OPCServerEndPoint;
+
+ static core::Property ApplicationURI;
+ static core::Property Username;
+ static core::Property Password;
+ static core::Property CertificatePath;
+ static core::Property KeyPath;
+ static core::Property TrustedPath;
+
+ BaseOPCProcessor(std::string name, utils::Identifier uuid =
utils::Identifier())
+ : Processor(name, uuid) {
+ }
+
+ virtual void onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
+
+ protected:
+ virtual bool reconnect();
+
+ std::shared_ptr<logging::Logger> logger_;
+
+ opc::ClientPtr connection_;
+
+ std::string endPointURL_;
+
+ std::string applicationURI_;
+ std::string username_;
+ std::string password_;
+ std::string certpath_;
+ std::string keypath_;
+ std::string trustpath_;
+
+ std::vector<char> certBuffer_;
+ std::vector<char> keyBuffer_;
+ std::vector<std::vector<char>> trustBuffers_;
+
+ bool configOK_;
+
+ virtual std::set<core::Property> getSupportedProperties() const {return
{OPCServerEndPoint, ApplicationURI, Username, Password, CertificatePath,
KeyPath, TrustedPath};}
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_OPCBASE_H
diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h
new file mode 100644
index 0000000..8dfe61c
--- /dev/null
+++ b/extensions/opc/include/putopc.h
@@ -0,0 +1,111 @@
+/**
+ * PutOPC class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_PUTOPC_H
+#define NIFI_MINIFI_CPP_PUTOPC_H
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include "opc.h"
+#include "opcbase.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class PutOPCProcessor : public BaseOPCProcessor {
+ public:
+ static constexpr char const* ProcessorName = "PutOPC";
+ // Supported Properties
+ static core::Property ParentNodeIDType;
+ static core::Property ParentNodeID;
+ static core::Property ParentNameSpaceIndex;
+ static core::Property ValueType;
+
+ static core::Property TargetNodeIDType;
+ static core::Property TargetNodeID;
+ static core::Property TargetNodeBrowseName;
+ static core::Property TargetNodeNameSpaceIndex;
+
+ // Supported Relationships
+ static core::Relationship Success;
+ static core::Relationship Failure;
+
+ PutOPCProcessor(std::string name, utils::Identifier uuid =
utils::Identifier())
+ : BaseOPCProcessor(name, uuid), nameSpaceIdx_(0), parentExists_(false) {
+ logger_ = logging::LoggerFactory<PutOPCProcessor>::getLogger();
+ }
+
+ virtual void onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
+
+ virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSession> &session) override;
+
+ virtual void initialize(void) override;
+
+ private:
+
+ class ReadCallback : public InputStreamCallback {
+ public:
+ ReadCallback(std::shared_ptr<logging::Logger> logger) : logger_(logger) {}
+ int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+ const std::vector<uint8_t>& getContent() const { return buf_; }
+
+ private:
+ std::vector<uint8_t> buf_;
+ std::shared_ptr<logging::Logger> logger_;
+ };
+
+ std::mutex onTriggerMutex_;
+
+ std::string nodeID_;
+ int32_t nameSpaceIdx_;
+ opc::OPCNodeIDType idType_;
+ UA_NodeId parentNodeID_;
+
+ bool parentExists_;
+
+ opc::OPCNodeDataType nodeDataType_;
+};
+
+REGISTER_RESOURCE(PutOPCProcessor, "Creates/updates OPC nodes");
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif // NIFI_MINIFI_CPP_PUTOPC_H
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
new file mode 100644
index 0000000..4efc202
--- /dev/null
+++ b/extensions/opc/src/fetchopc.cpp
@@ -0,0 +1,235 @@
+/**
+ * FetchOPC class definition
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include "opc.h"
+#include "fetchopc.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+ core::Property FetchOPCProcessor::NodeID(
+ core::PropertyBuilder::createProperty("Node ID")
+ ->withDescription("Specifies the ID of the root node to traverse")
+ ->isRequired(true)->build());
+
+
+ core::Property FetchOPCProcessor::NodeIDType(
+ core::PropertyBuilder::createProperty("Node ID type")
+ ->withDescription("Specifies the type of the provided node ID")
+ ->isRequired(true)
+ ->withAllowableValues<std::string>({"Path", "Int", "String"})->build());
+
+ core::Property FetchOPCProcessor::NameSpaceIndex(
+ core::PropertyBuilder::createProperty("Namespace index")
+ ->withDescription("The index of the namespace. Used only if node ID type
is not path.")
+ ->withDefaultValue<int32_t>(0)->build());
+
+ core::Property FetchOPCProcessor::MaxDepth(
+ core::PropertyBuilder::createProperty("Max depth")
+ ->withDescription("Specifiec the max depth of browsing. 0 means
unlimited.")
+ ->withDefaultValue<uint64_t>(0)->build());
+
+ core::Relationship FetchOPCProcessor::Success("success", "Successfully
retrieved OPC-UA nodes");
+ core::Relationship FetchOPCProcessor::Failure("failure", "Retrieved OPC-UA
nodes where value cannot be extracted (only if enabled)");
+
+
+ void FetchOPCProcessor::initialize() {
+ // Set the supported properties
+ std::set<core::Property> fetchOPCProperties = {OPCServerEndPoint, NodeID,
NodeIDType, NameSpaceIndex, MaxDepth};
+ std::set<core::Property> baseOPCProperties =
BaseOPCProcessor::getSupportedProperties();
+ fetchOPCProperties.insert(baseOPCProperties.begin(),
baseOPCProperties.end());
+ setSupportedProperties(fetchOPCProperties);
+
+ // Set the supported relationships
+ setSupportedRelationships({Success, Failure});
+ }
+
+ void FetchOPCProcessor::onSchedule(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &factory) {
+ logger_->log_trace("FetchOPCProcessor::onSchedule");
+
+ translatedNodeIDs_.clear(); // Path might has changed during restart
+
+ BaseOPCProcessor::onSchedule(context, factory);
+
+ if(!configOK_) {
+ return;
+ }
+
+ configOK_ = false;
+
+ std::string value;
+ context->getProperty(NodeID.getName(), nodeID_);
+ context->getProperty(NodeIDType.getName(), value);
+
+ maxDepth_ = 0;
+ context->getProperty(MaxDepth.getName(), maxDepth_);
+
+ if (value == "String") {
+ idType_ = opc::OPCNodeIDType::String;
+ } else if (value == "Int") {
+ idType_ = opc::OPCNodeIDType::Int;
+ } else if (value == "Path") {
+ idType_ = opc::OPCNodeIDType::Path;
+ } else {
+ // Where have our validators gone?
+ logger_->log_error("%s is not a valid node ID type!", value.c_str());
+ }
+
+ if(idType_ == opc::OPCNodeIDType::Int) {
+ try {
+ int t = std::stoi(nodeID_);
+ } catch(...) {
+ logger_->log_error("%s cannot be used as an int type node ID",
nodeID_.c_str());
+ return;
+ }
+ }
+ if(idType_ != opc::OPCNodeIDType::Path) {
+ if(!context->getProperty(NameSpaceIndex.getName(), nameSpaceIdx_)) {
+ logger_->log_error("%s is mandatory in case %s is not Path",
NameSpaceIndex.getName().c_str(), NodeIDType.getName().c_str());
+ return;
+ }
+ }
+
+ configOK_ = true;
+ }
+
+ void FetchOPCProcessor::onTrigger(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session){
+ if(!configOK_) {
+ logger_->log_error("This processor was not configured properly,
yielding. Please check for previous errors in the logs!");
+ yield();
+ return;
+ }
+
+ logger_->log_trace("FetchOPCProcessor::onTrigger");
+
+ std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+ if(!lock.owns_lock()){
+ logger_->log_warn("processor was triggered before previous listing
finished, configuration should be revised!");
+ return;
+ }
+
+ if (!reconnect()) {
+ yield();
+ return;
+ }
+
+ nodesFound_ = 0;
+ variablesFound_ = 0;
+
+ std::function<opc::nodeFoundCallBackFunc> f =
std::bind(&FetchOPCProcessor::nodeFoundCallBack, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, context, session);
+ if(idType_ != opc::OPCNodeIDType::Path) {
+ UA_NodeId myID;
+ myID.namespaceIndex = nameSpaceIdx_;
+ if(idType_ == opc::OPCNodeIDType::Int) {
+ myID.identifierType = UA_NODEIDTYPE_NUMERIC;
+ myID.identifier.numeric = std::stoi(nodeID_);
+ } else if (idType_ == opc::OPCNodeIDType::String) {
+ myID.identifierType = UA_NODEIDTYPE_STRING;
+ myID.identifier.string = UA_STRING_ALLOC(nodeID_.c_str());
+ }
+ connection_->traverse(myID, f, "", maxDepth_);
+ } else {
+ if(translatedNodeIDs_.empty()) {
+ auto sc = connection_->translateBrowsePathsToNodeIdsRequest(nodeID_,
translatedNodeIDs_, logger_);
+ if(sc != UA_STATUSCODE_GOOD) {
+ logger_->log_error("Failed to translate %s to node id, no flow files
will be generated (%s)", nodeID_.c_str(), UA_StatusCode_name(sc));
+ yield();
+ return;
+ }
+ }
+ for(auto& nodeID: translatedNodeIDs_) {
+ connection_->traverse(nodeID, f, nodeID_, maxDepth_);
+ }
+ }
+ if(nodesFound_ == 0) {
+ logger_->log_warn("Connected to OPC server, but no variable nodes were
not found. Configuration might be incorrect! Yielding...");
+ yield();
+ } else if (variablesFound_ == 0) {
+ logger_->log_warn("Found no variables when traversing the specified
node. No flowfiles are generated. Yielding...");
+ yield();
+ }
+
+ }
+
+ bool FetchOPCProcessor::nodeFoundCallBack(opc::Client& client, const
UA_ReferenceDescription *ref, const std::string& path,
+ const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) {
+ nodesFound_++;
+ if(ref->nodeClass == UA_NODECLASS_VARIABLE)
+ {
+ try {
+ opc::NodeData nodedata = connection_->getNodeData(ref);
+ OPCData2FlowFile(nodedata, context, session);
+ variablesFound_++;
+ } catch (const std::exception& exception) {
+ std::string browsename((char*)ref->browseName.name.data,
ref->browseName.name.length);
+ logger_->log_warn("Caught Exception while trying to get data from node
&s: %s", path + "/" + browsename, exception.what());
+ }
+ }
+ return true;
+ }
+
+ void FetchOPCProcessor::OPCData2FlowFile(const opc::NodeData& opcnode, const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) {
+ std::shared_ptr<FlowFileRecord> flowFile =
std::static_pointer_cast<FlowFileRecord>(session->create());
+ if (flowFile == nullptr) {
+ logger_->log_error("Failed to create flowfile!");
+ return;
+ }
+ for(const auto& attr: opcnode.attributes) {
+ flowFile->setAttribute(attr.first, attr.second);
+ }
+ if(opcnode.data.size() > 0) {
+ try {
+ FetchOPCProcessor::WriteCallback
callback(opc::nodeValue2String(opcnode));
+ session->write(flowFile, &callback);
+ } catch (const std::exception& e) {
+ std::string browsename;
+ flowFile->getAttribute("Browsename", browsename);
+ logger_->log_info("Failed to extract data of OPC node %s: %s",
browsename, e.what());
+ session->transfer(flowFile, Failure);
+ return;
+ }
+ }
+ session->transfer(flowFile, Success);
+ }
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/opc/src/opc.cpp b/extensions/opc/src/opc.cpp
new file mode 100644
index 0000000..841fd55
--- /dev/null
+++ b/extensions/opc/src/opc.cpp
@@ -0,0 +1,567 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//OPC includes
+#include "opc.h"
+
+//MiNiFi includes
+#include "utils/ScopeGuard.h"
+#include "utils/StringUtils.h"
+#include "logging/Logger.h"
+#include "Exception.h"
+
+//Standard includes
+#include <stdlib.h>
+#include <iostream>
+#include <memory>
+#include <vector>
+#include <string>
+#include <functional>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace opc {
+
+/*
+ * The following functions are only used internally in OPC lib, not to be
exported
+ */
+
+namespace {
+
+ void add_value_to_variant(UA_Variant *variant, std::string &value) {
+ UA_String ua_value = UA_STRING(&value[0]);
+ UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_STRING]);
+ }
+
+ void add_value_to_variant(UA_Variant *variant, const char *value) {
+ std::string strvalue(value);
+ add_value_to_variant(variant, strvalue);
+ }
+
+ void add_value_to_variant(UA_Variant *variant, int64_t value) {
+ UA_Int64 ua_value = value;
+ UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_INT64]);
+ }
+
+ void add_value_to_variant(UA_Variant *variant, uint64_t value) {
+ UA_UInt64 ua_value = value;
+ UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_UINT64]);
+ }
+
+ void add_value_to_variant(UA_Variant *variant, int32_t value) {
+ UA_Int32 ua_value = value;
+ UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_INT32]);
+ }
+
+ void add_value_to_variant(UA_Variant *variant, uint32_t value) {
+ UA_UInt32 ua_value = value;
+ UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_UINT32]);
+ }
+
+ void add_value_to_variant(UA_Variant *variant, bool value) {
+ UA_Boolean ua_value = value;
+ UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_BOOLEAN]);
+ }
+
+ void add_value_to_variant(UA_Variant *variant, float value) {
+ UA_Float ua_value = value;
+ UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_FLOAT]);
+ }
+
+ void add_value_to_variant(UA_Variant *variant, double value) {
+ UA_Double ua_value = value;
+ UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_DOUBLE]);
+ }
+
+ core::logging::LOG_LEVEL MapOPCLogLevel(UA_LogLevel ualvl) {
+ switch (ualvl) {
+ case UA_LOGLEVEL_TRACE:
+ return core::logging::trace;
+ case UA_LOGLEVEL_DEBUG:
+ return core::logging::debug;
+ case UA_LOGLEVEL_INFO:
+ return core::logging::info;
+ case UA_LOGLEVEL_WARNING:
+ return core::logging::warn;
+ case UA_LOGLEVEL_ERROR:
+ return core::logging::err;
+ case UA_LOGLEVEL_FATAL:
+ return core::logging::critical;
+ default:
+ return core::logging::critical;
+ }
+ }
+}
+
+/*
+ * End of internal functions
+ */
+
+Client::Client(std::shared_ptr<core::logging::Logger> logger, const
std::string& applicationURI,
+ const std::vector<char>& certBuffer, const std::vector<char>&
keyBuffer,
+ const std::vector<std::vector<char>>& trustBuffers) {
+
+ client_ = UA_Client_new();
+ if (certBuffer.empty()) {
+ UA_ClientConfig_setDefault(UA_Client_getConfig(client_));
+ } else {
+ UA_ClientConfig *cc = UA_Client_getConfig(client_);
+ cc->securityMode = UA_MESSAGESECURITYMODE_SIGNANDENCRYPT;
+
+ // Certificate
+ UA_ByteString certByteString = UA_STRING_NULL;
+ certByteString.length = certBuffer.size();
+ certByteString.data = (UA_Byte*)UA_malloc(certByteString.length *
sizeof(UA_Byte));
+ memcpy(certByteString.data, certBuffer.data(), certByteString.length);
+
+ // Key
+ UA_ByteString keyByteString = UA_STRING_NULL;
+ keyByteString.length = keyBuffer.size();
+ keyByteString.data = (UA_Byte*)UA_malloc(keyByteString.length *
sizeof(UA_Byte));
+ memcpy(keyByteString.data, keyBuffer.data(), keyByteString.length);
+
+ // Trusted certificates
+ UA_STACKARRAY(UA_ByteString, trustList, trustBuffers.size());
+ for (size_t i = 0; i < trustBuffers.size(); i++) {
+ trustList[i] = UA_STRING_NULL;
+ trustList[i].length = trustBuffers[i].size();
+ trustList[i].data = (UA_Byte*)UA_malloc(trustList[i].length *
sizeof(UA_Byte));
+ memcpy(trustList[i].data, trustBuffers[i].data(), trustList[i].length);
+ }
+ UA_StatusCode sc = UA_ClientConfig_setDefaultEncryption(cc,
certByteString, keyByteString,
+ trustList,
trustBuffers.size(),
+ nullptr, 0);
+ UA_ByteString_clear(&certByteString);
+ UA_ByteString_clear(&keyByteString);
+ for (size_t i = 0; i < trustBuffers.size(); i++) {
+ UA_ByteString_clear(&trustList[i]);
+ }
+ if (sc != UA_STATUSCODE_GOOD) {
+ logger->log_error("Configuring the client for encryption failed: %s",
UA_StatusCode_name(sc));
+ UA_Client_delete(client_);
+ throw OPCException(GENERAL_EXCEPTION, std::string("Failed to created
client with the provided encryption settings: ") + UA_StatusCode_name(sc));
+ }
+ }
+
+ const UA_Logger MinifiUALogger = {logFunc, logger.get(), logClear};
+
+ UA_ClientConfig *configPtr = UA_Client_getConfig(client_);
+ configPtr->logger = MinifiUALogger;
+
+ if(applicationURI.length() > 0) {
+ UA_String_clear(&configPtr->clientDescription.applicationUri);
+ configPtr->clientDescription.applicationUri =
UA_STRING_ALLOC(applicationURI.c_str());
+ }
+
+ logger_ = logger;
+}
+
+Client::~Client() {
+ if(client_ == nullptr) {
+ return;
+ }
+ if(UA_Client_getState(client_) != UA_CLIENTSTATE_DISCONNECTED) {
+ auto sc = UA_Client_disconnect(client_);
+ if(sc != UA_STATUSCODE_GOOD) {
+ logger_->log_warn("Failed to disconnect OPC client: %s",
UA_StatusCode_name(sc));
+ }
+ }
+ UA_Client_delete(client_);
+}
+
+bool Client::isConnected() {
+ if(!client_) {
+ return false;
+ }
+ return UA_Client_getState(client_) != UA_CLIENTSTATE_DISCONNECTED;
+}
+
+UA_StatusCode Client::connect(const std::string& url, const std::string&
username, const std::string& password) {
+ if (username.empty()) {
+ return UA_Client_connect(client_, url.c_str());
+ } else {
+ return UA_Client_connect_username(client_, url.c_str(), username.c_str(),
password.c_str());
+ }
+}
+
+NodeData Client::getNodeData(const UA_ReferenceDescription *ref, const
std::string& basePath) {
+ if(ref->nodeClass == UA_NODECLASS_VARIABLE)
+ {
+ opc::NodeData nodedata;
+ std::string browsename(reinterpret_cast<const
char*>(ref->browseName.name.data), ref->browseName.name.length);
+
+ if(ref->nodeId.nodeId.identifierType == UA_NODEIDTYPE_STRING) {
+ std::string nodeidstr(reinterpret_cast<const
char*>(ref->nodeId.nodeId.identifier.string.data),
+ ref->nodeId.nodeId.identifier.string.length);
+ nodedata.attributes["NodeID"] = nodeidstr;
+ nodedata.attributes["NodeID type"] = "string";
+ } else if(ref->nodeId.nodeId.identifierType == UA_NODEIDTYPE_BYTESTRING) {
+ std::string nodeidstr(reinterpret_cast<const
char*>(ref->nodeId.nodeId.identifier.byteString.data),
ref->nodeId.nodeId.identifier.byteString.length);
+ nodedata.attributes["NodeID"] = nodeidstr;
+ nodedata.attributes["NodeID type"] = "bytestring";
+ } else if (ref->nodeId.nodeId.identifierType == UA_NODEIDTYPE_NUMERIC) {
+ nodedata.attributes["NodeID"] =
std::to_string(ref->nodeId.nodeId.identifier.numeric);
+ nodedata.attributes["NodeID type"] = "numeric";
+ }
+ nodedata.attributes["Browsename"] = browsename;
+ nodedata.attributes["Full path"] = basePath + "/" + browsename;
+ nodedata.dataTypeID = UA_TYPES_COUNT;
+ UA_Variant* var = UA_Variant_new();
+ if(UA_Client_readValueAttribute(client_, ref->nodeId.nodeId, var) ==
UA_STATUSCODE_GOOD && var->type != NULL && var->data != NULL) {
+ nodedata.dataTypeID = var->type->typeIndex;
+ nodedata.addVariant(var);
+ if(var->type->typeName) {
+ nodedata.attributes["Typename"] = std::string(var->type->typeName);
+ }
+ if(var->type->memSize) {
+ nodedata.attributes["Datasize"] = std::to_string(var->type->memSize);
+ nodedata.data = std::vector<uint8_t>(var->type->memSize);
+ memcpy(nodedata.data.data(), var->data, var->type->memSize);
+ }
+ return nodedata;
+ }
+ UA_Variant_delete(var);
+ throw OPCException(GENERAL_EXCEPTION, "Failed to read value of node: " +
browsename);
+ } else {
+ throw OPCException(GENERAL_EXCEPTION, "Only variable nodes are
supported!");
+ }
+}
+
+UA_ReferenceDescription * Client::getNodeReference(UA_NodeId nodeId) {
+ UA_ReferenceDescription *ref = UA_ReferenceDescription_new();
+ UA_ReferenceDescription_init(ref);
+ UA_NodeId_copy(&nodeId, &ref->nodeId.nodeId);
+ auto sc = UA_Client_readNodeClassAttribute(client_, nodeId, &ref->nodeClass);
+ if (sc == UA_STATUSCODE_GOOD) {
+ sc = UA_Client_readBrowseNameAttribute(client_, nodeId, &ref->browseName);
+ }
+ if (sc == UA_STATUSCODE_GOOD) {
+ UA_Client_readDisplayNameAttribute(client_, nodeId, &ref->displayName);
+ }
+ return ref;
+}
+
+void Client::traverse(UA_NodeId nodeId, std::function<nodeFoundCallBackFunc>
cb, const std::string& basePath, uint64_t maxDepth, bool fetchRoot) {
+ if (fetchRoot) {
+ UA_ReferenceDescription *rootRef = getNodeReference(nodeId);
+ if ((rootRef->nodeClass == UA_NODECLASS_VARIABLE || rootRef->nodeClass ==
UA_NODECLASS_OBJECT) && rootRef->browseName.name.length > 0) {
+ cb(*this, rootRef, basePath);
+ }
+ UA_ReferenceDescription_delete(rootRef);
+ }
+
+ if(maxDepth != 0) {
+ maxDepth--;
+ if(maxDepth == 0) {
+ return;
+ }
+ }
+ UA_BrowseRequest bReq;
+ UA_BrowseRequest_init(&bReq);
+ bReq.requestedMaxReferencesPerNode = 0;
+ bReq.nodesToBrowse = UA_BrowseDescription_new();
+ bReq.nodesToBrowseSize = 1;
+
+ UA_NodeId_copy(&nodeId, &bReq.nodesToBrowse[0].nodeId);
+ bReq.nodesToBrowse[0].resultMask = UA_BROWSERESULTMASK_ALL;
+
+ UA_BrowseResponse bResp = UA_Client_Service_browse(client_, bReq);
+
+ utils::ScopeGuard guard([&bResp]() {
+ UA_BrowseResponse_deleteMembers(&bResp);
+ });
+
+ UA_BrowseRequest_deleteMembers(&bReq);
+
+ for(size_t i = 0; i < bResp.resultsSize; ++i) {
+ for(size_t j = 0; j < bResp.results[i].referencesSize; ++j) {
+ UA_ReferenceDescription *ref = &(bResp.results[i].references[j]);
+ if (cb(*this, ref, basePath)) {
+ if (ref->nodeClass == UA_NODECLASS_VARIABLE || ref->nodeClass ==
UA_NODECLASS_OBJECT) {
+ std::string browsename((char *) ref->browseName.name.data,
ref->browseName.name.length);
+ traverse(ref->nodeId.nodeId, cb, basePath + browsename, maxDepth,
false);
+ }
+ } else {
+ return;
+ }
+ }
+ }
+};
+
+bool Client::exists(UA_NodeId nodeId) {
+ bool retval = false;
+ auto callback = [&retval](Client& client, const UA_ReferenceDescription
*ref, const std::string& pat) -> bool {
+ retval = true;
+ return false; // If any node is found, the given node exists, so traverse
can be stopped
+ };
+ traverse(nodeId, callback, "", 1);
+ return retval;
+};
+
+UA_StatusCode Client::translateBrowsePathsToNodeIdsRequest(const std::string&
path, std::vector<UA_NodeId>& foundNodeIDs, const
std::shared_ptr<core::logging::Logger>& logger) {
+ logger->log_trace("Trying to find node id for %s", path.c_str());
+
+ auto tokens = utils::StringUtils::split(path, "/");
+ std::vector<UA_UInt32> ids;
+ for(size_t i = 0; i < tokens.size(); ++i) {
+ UA_UInt32 val = (i ==0 ) ? UA_NS0ID_ORGANIZES : UA_NS0ID_HASCOMPONENT;
+ ids.push_back(val);
+ }
+
+ UA_BrowsePath browsePath;
+ UA_BrowsePath_init(&browsePath);
+ browsePath.startingNode = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
+
+ browsePath.relativePath.elements =
(UA_RelativePathElement*)UA_Array_new(tokens.size(),
&UA_TYPES[UA_TYPES_RELATIVEPATHELEMENT]);
+ browsePath.relativePath.elementsSize = tokens.size();
+
+ for(size_t i = 0; i < tokens.size(); ++i) {
+ UA_RelativePathElement *elem = &browsePath.relativePath.elements[i];
+ elem->referenceTypeId = UA_NODEID_NUMERIC(0, ids[i]);
+ elem->targetName = UA_QUALIFIEDNAME_ALLOC(0, tokens[i].c_str());
+ }
+
+ UA_TranslateBrowsePathsToNodeIdsRequest request;
+ UA_TranslateBrowsePathsToNodeIdsRequest_init(&request);
+ request.browsePaths = &browsePath;
+ request.browsePathsSize = 1;
+
+ UA_TranslateBrowsePathsToNodeIdsResponse response =
UA_Client_Service_translateBrowsePathsToNodeIds(client_, request);
+
+ utils::ScopeGuard guard([&browsePath]() {
+ UA_BrowsePath_deleteMembers(&browsePath);
+ });
+
+ if(response.resultsSize < 1) {
+ logger->log_warn("No node id in response for %s", path.c_str());
+ return UA_STATUSCODE_BADNODATAAVAILABLE;
+ }
+
+ bool foundData = false;
+
+ for(size_t i = 0; i < response.resultsSize; ++i) {
+ UA_BrowsePathResult res = response.results[i];
+ for(size_t j = 0; j < res.targetsSize; ++j) {
+ foundData = true;
+ UA_NodeId resultId;
+ UA_NodeId_copy(&res.targets[j].targetId.nodeId, &resultId);
+ foundNodeIDs.push_back(resultId);
+ std::string
namespaceUri((char*)res.targets[j].targetId.namespaceUri.data,
res.targets[j].targetId.namespaceUri.length);
+ }
+ }
+
+ UA_TranslateBrowsePathsToNodeIdsResponse_deleteMembers(&response);
+
+ if(foundData) {
+ logger->log_debug("Found %lu nodes for path %s", foundNodeIDs.size(),
path.c_str());
+ return UA_STATUSCODE_GOOD;
+ } else {
+ logger->log_warn("No node id found for path %s", path.c_str());
+ return UA_STATUSCODE_BADNODATAAVAILABLE;
+ }
+}
+
+template<typename T>
+UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId
targetNodeId, std::string browseName, T value, OPCNodeDataType dt, UA_NodeId
*receivedNodeId)
+{
+ UA_VariableAttributes attr = UA_VariableAttributes_default;
+ add_value_to_variant(&attr.value, value);
+ char local[6] = "en-US";
+ attr.displayName = UA_LOCALIZEDTEXT(local,
const_cast<char*>(browseName.c_str()));
+ UA_StatusCode sc = UA_Client_addVariableNode(client_,
+ targetNodeId,
+ parentNodeId,
+ UA_NODEID_NUMERIC(0,
OPCNodeDataTypeToTypeID(dt)),
+ UA_QUALIFIEDNAME(1,
const_cast<char*>(browseName.c_str())),
+ UA_NODEID_NULL,
+ attr, receivedNodeId);
+ UA_Variant_clear(&attr.value);
+ return sc;
+}
+
+template<typename T>
+UA_StatusCode Client::update_node(const UA_NodeId nodeId, T value) {
+ UA_Variant *variant = UA_Variant_new();
+ add_value_to_variant(variant, value);
+ UA_StatusCode sc = UA_Client_writeValueAttribute(client_, nodeId, variant);
+ UA_Variant_delete(variant);
+ return sc;
+};
+
+std::unique_ptr<Client>
Client::createClient(std::shared_ptr<core::logging::Logger> logger, const
std::string& applicationURI,
+ const std::vector<char>&
certBuffer, const std::vector<char>& keyBuffer,
+ const
std::vector<std::vector<char>>& trustBuffers) {
+ try {
+ return ClientPtr(new Client(logger, applicationURI, certBuffer, keyBuffer,
trustBuffers));
+ } catch (const std::exception& exception) {
+ logger->log_error("Failed to create client: %s", exception.what());
+ }
+ return nullptr;
+}
+
+template UA_StatusCode Client::update_node<int64_t>(const UA_NodeId nodeId,
int64_t value);
+template UA_StatusCode Client::update_node<uint64_t>(const UA_NodeId nodeId,
uint64_t value);
+template UA_StatusCode Client::update_node<int32_t>(const UA_NodeId nodeId,
int32_t value);
+template UA_StatusCode Client::update_node<uint32_t>(const UA_NodeId nodeId,
uint32_t value);
+template UA_StatusCode Client::update_node<float>(const UA_NodeId nodeId,
float value);
+template UA_StatusCode Client::update_node<double>(const UA_NodeId nodeId,
double value);
+template UA_StatusCode Client::update_node<bool>(const UA_NodeId nodeId, bool
value);
+template UA_StatusCode Client::update_node<const char *>(const UA_NodeId
nodeId, const char * value);
+template UA_StatusCode Client::update_node<std::string>(const UA_NodeId
nodeId, std::string value);
+
+template UA_StatusCode Client::add_node<int64_t>(const UA_NodeId parentNodeId,
const UA_NodeId targetNodeId, std::string browseName, int64_t value,
OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<uint64_t>(const UA_NodeId
parentNodeId, const UA_NodeId targetNodeId, std::string browseName, uint64_t
value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<int32_t>(const UA_NodeId parentNodeId,
const UA_NodeId targetNodeId, std::string browseName, int32_t value,
OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<uint32_t>(const UA_NodeId
parentNodeId, const UA_NodeId targetNodeId, std::string browseName, uint32_t
value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<float>(const UA_NodeId parentNodeId,
const UA_NodeId targetNodeId, std::string browseName, float value,
OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<double>(const UA_NodeId parentNodeId,
const UA_NodeId targetNodeId, std::string browseName, double value,
OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<bool>(const UA_NodeId parentNodeId,
const UA_NodeId targetNodeId, std::string browseName, bool value,
OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<const char *>(const UA_NodeId
parentNodeId, const UA_NodeId targetNodeId, std::string browseName, const char
* value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<std::string>(const UA_NodeId
parentNodeId, const UA_NodeId targetNodeId, std::string browseName, std::string
value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+
+int32_t OPCNodeDataTypeToTypeID(OPCNodeDataType dt) {
+ switch(dt)
+ {
+ case OPCNodeDataType::Boolean:
+ return UA_NS0ID_BOOLEAN;
+ case OPCNodeDataType::Int32:
+ return UA_NS0ID_INT32;
+ case OPCNodeDataType::UInt32:
+ return UA_NS0ID_UINT32;
+ case OPCNodeDataType::Int64:
+ return UA_NS0ID_INT64;
+ case OPCNodeDataType::UInt64:
+ return UA_NS0ID_UINT64;
+ case OPCNodeDataType::Float:
+ return UA_NS0ID_FLOAT;
+ case OPCNodeDataType::Double:
+ return UA_NS0ID_DOUBLE;
+ case OPCNodeDataType::String:
+ return UA_NS0ID_STRING;
+ default:
+ throw OPCException(GENERAL_EXCEPTION, "Data type is not supported");
+ }
+}
+
+std::string nodeValue2String(const NodeData& nd) {
+ std::string ret_val;
+ switch (nd.dataTypeID) {
+ case UA_TYPES_STRING:
+ case UA_TYPES_LOCALIZEDTEXT:
+ case UA_TYPES_BYTESTRING: {
+ UA_String value = *(UA_String * )(nd.var_->data);
+ ret_val = std::string(reinterpret_cast<const char *>(value.data),
value.length);
+ break;
+ }
+ case UA_TYPES_BOOLEAN:
+ bool b;
+ memcpy(&b, nd.data.data(), sizeof(bool));
+ ret_val = b ? "True" : "False";
+ break;
+ case UA_TYPES_SBYTE:
+ int8_t i8t;
+ memcpy(&i8t, nd.data.data(), sizeof(i8t));
+ ret_val = std::to_string(i8t);
+ break;
+ case UA_TYPES_BYTE:
+ uint8_t ui8t;
+ memcpy(&ui8t, nd.data.data(), sizeof(ui8t));
+ ret_val = std::to_string(ui8t);
+ break;
+ case UA_TYPES_INT16:
+ int16_t i16t;
+ memcpy(&i16t, nd.data.data(), sizeof(i16t));
+ ret_val = std::to_string(i16t);
+ break;
+ case UA_TYPES_UINT16:
+ uint16_t ui16t;
+ memcpy(&ui16t, nd.data.data(), sizeof(ui16t));
+ ret_val = std::to_string(ui16t);
+ break;
+ case UA_TYPES_INT32:
+ int32_t i32t;
+ memcpy(&i32t, nd.data.data(), sizeof(i32t));
+ ret_val = std::to_string(i32t);
+ break;
+ case UA_TYPES_UINT32:
+ uint32_t ui32t;
+ memcpy(&ui32t, nd.data.data(), sizeof(ui32t));
+ ret_val = std::to_string(ui32t);
+ break;
+ case UA_TYPES_INT64:
+ int64_t i64t;
+ memcpy(&i64t, nd.data.data(), sizeof(i64t));
+ ret_val = std::to_string(i64t);
+ break;
+ case UA_TYPES_UINT64:
+ uint64_t ui64t;
+ memcpy(&ui64t, nd.data.data(), sizeof(ui64t));
+ ret_val = std::to_string(ui64t);
+ break;
+ case UA_TYPES_FLOAT:
+ if(sizeof(float) == 4 && std::numeric_limits<float>::is_iec559){
+ float f;
+ memcpy(&f, nd.data.data(), sizeof(float));
+ ret_val = std::to_string(f);
+ } else {
+ throw OPCException(GENERAL_EXCEPTION, "Float is non-standard on this
system, OPC data cannot be extracted!");
+ }
+ break;
+ case UA_TYPES_DOUBLE:
+ if(sizeof(double) == 8 && std::numeric_limits<double>::is_iec559){
+ double d;
+ memcpy(&d, nd.data.data(), sizeof(double));
+ ret_val = std::to_string(d);
+ } else {
+ throw OPCException(GENERAL_EXCEPTION, "Double is non-standard on this
system, OPC data cannot be extracted!");
+ }
+ break;
+ case UA_TYPES_DATETIME: {
+ UA_DateTime dt;
+ memcpy(&dt, nd.data.data(), sizeof(UA_DateTime));
+ ret_val = opc::OPCDateTime2String(dt);
+ break;
+ }
+ default:
+ throw OPCException(GENERAL_EXCEPTION, "Data type is not supported ");
+ }
+ return ret_val;
+}
+
+std::string OPCDateTime2String(UA_DateTime raw_date) {
+ UA_DateTimeStruct dts = UA_DateTime_toStruct(raw_date);
+ std::array<char, 100> charBuf;
+
+ snprintf(charBuf.data(), charBuf.size(), "%02hu-%02hu-%04hu
%02hu:%02hu:%02hu.%03hu", dts.day, dts.month, dts.year, dts.hour, dts.min,
dts.sec, dts.milliSec);
+
+ return std::string(charBuf.data(), charBuf.size());
+}
+
+void logFunc(void *context, UA_LogLevel level, UA_LogCategory category, const
char *msg, va_list args) {
+ char buffer[1024];
+ vsnprintf(buffer, 1024, msg, args);
+ auto loggerPtr = reinterpret_cast<core::logging::BaseLogger*>(context);
+ loggerPtr->log_string(MapOPCLogLevel(level), buffer);
+}
+
+} /* namespace opc */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/opc/src/opcbase.cpp b/extensions/opc/src/opcbase.cpp
new file mode 100644
index 0000000..4fb42b3
--- /dev/null
+++ b/extensions/opc/src/opcbase.cpp
@@ -0,0 +1,162 @@
+/**
+ * BaseOPC class definition
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+
+#include "opc.h"
+#include "opcbase.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+ core::Property BaseOPCProcessor::OPCServerEndPoint(
+ core::PropertyBuilder::createProperty("OPC server endpoint")
+ ->withDescription("Specifies the address, port and relative path of an
OPC endpoint")
+ ->isRequired(true)->build());
+
+ core::Property BaseOPCProcessor::ApplicationURI(
+ core::PropertyBuilder::createProperty("Application URI")
+ ->withDescription("Application URI of the client in the format
'urn:unconfigured:application'. "
+ "Mandatory, if using Secure Channel and must match
the URI included in the certificate's Subject Alternative Names.")->build());
+
+
+ core::Property BaseOPCProcessor::Username(
+ core::PropertyBuilder::createProperty("Username")
+ ->withDescription("Username to log in with.")->build());
+
+ core::Property BaseOPCProcessor::Password(
+ core::PropertyBuilder::createProperty("Password")
+ ->withDescription("Password to log in with. Providing this requires
cert and key to be provided as well, credentials are always sent
encrypted.")->build());
+
+ core::Property BaseOPCProcessor::CertificatePath(
+ core::PropertyBuilder::createProperty("Certificate path")
+ ->withDescription("Path to the DER-encoded cert file")->build());
+
+ core::Property BaseOPCProcessor::KeyPath(
+ core::PropertyBuilder::createProperty("Key path")
+ ->withDescription("Path to the DER-encoded key file")->build());
+
+ core::Property BaseOPCProcessor::TrustedPath(
+ core::PropertyBuilder::createProperty("Trusted server certificate path")
+ ->withDescription("Path to the DER-encoded trusted server
certificate")->build());
+
+ void BaseOPCProcessor::onSchedule(const
std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSessionFactory> &factory) {
+ logger_->log_trace("BaseOPCProcessor::onSchedule");
+
+ applicationURI_.clear();
+ certBuffer_.clear();
+ keyBuffer_.clear();
+ password_.clear();
+ username_.clear();
+ trustBuffers_.clear();
+
+ configOK_ = false;
+
+ context->getProperty(OPCServerEndPoint.getName(), endPointURL_);
+ context->getProperty(ApplicationURI.getName(), applicationURI_);
+
+ if (context->getProperty(Username.getName(), username_) !=
context->getProperty(Password.getName(), password_)) {
+ logger_->log_error("Both or neither of Username and Password should be
provided!");
+ return;
+ }
+
+ auto certificatePathRes = context->getProperty(CertificatePath.getName(),
certpath_);
+ auto keyPathRes = context->getProperty(KeyPath.getName(), keypath_);
+ auto trustedPathRes = context->getProperty(TrustedPath.getName(),
trustpath_);
+ if (certificatePathRes != keyPathRes || keyPathRes != trustedPathRes) {
+ logger_->log_error("All or none of Certificate path, Key path and
Trusted server certificate path should be provided!");
+ return;
+ }
+
+ if (!password_.empty() && (certpath_.empty() || keypath_.empty() ||
trustpath_.empty() || applicationURI_.empty())) {
+ logger_->log_error("Certificate path, Key path, Trusted server
certificate path and Application URI must be provided in case Password is
provided!");
+ return;
+ }
+
+ if (!certpath_.empty()) {
+ if (applicationURI_.empty()) {
+ logger_->log_error("Application URI must be provided if Certificate
path is provided!");
+ return;
+ }
+
+ std::ifstream input_cert(certpath_, std::ios::binary);
+ if (input_cert.good()) {
+ certBuffer_ =
std::vector<char>(std::istreambuf_iterator<char>(input_cert), {});
+ }
+ std::ifstream input_key(keypath_, std::ios::binary);
+ if (input_key.good()) {
+ keyBuffer_ =
std::vector<char>(std::istreambuf_iterator<char>(input_key), {});
+ }
+
+ trustBuffers_.emplace_back();
+ std::ifstream input_trust(trustpath_, std::ios::binary);
+ if (input_trust.good()) {
+ trustBuffers_[0] =
std::vector<char>(std::istreambuf_iterator<char>(input_trust), {});
+ }
+
+ if (certBuffer_.empty()) {
+ logger_->log_error("Failed to load cert from path: %s", certpath_);
+ return;
+ }
+ if (keyBuffer_.empty()) {
+ logger_->log_error("Failed to load key from path: %s", keypath_);
+ return;
+ }
+
+ if (trustBuffers_[0].empty()) {
+ logger_->log_error("Failed to load trusted server certs from path:
%s", trustpath_);
+ return;
+ }
+ }
+
+ configOK_ = true;
+ }
+
+ bool BaseOPCProcessor::reconnect() {
+ if (connection_ == nullptr) {
+ connection_ = opc::Client::createClient(logger_, applicationURI_,
certBuffer_, keyBuffer_, trustBuffers_);
+ }
+
+ if (connection_->isConnected()) {
+ return true;
+ }
+
+ auto sc = connection_->connect(endPointURL_, username_, password_);
+ if (sc != UA_STATUSCODE_GOOD) {
+ logger_->log_error("Failed to connect: %s!", UA_StatusCode_name(sc));
+ return false;
+ }
+ logger_->log_debug("Successfully connected.");
+ return true;
+ }
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
new file mode 100644
index 0000000..c5acf3f
--- /dev/null
+++ b/extensions/opc/src/putopc.cpp
@@ -0,0 +1,466 @@
+/**
+ * PutOPC class definition
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include "opc.h"
+#include "putopc.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+ core::Property PutOPCProcessor::ParentNodeID(
+ core::PropertyBuilder::createProperty("Parent node ID")
+ ->withDescription("Specifies the ID of the root node to traverse")
+ ->isRequired(true)->build());
+
+
+ core::Property PutOPCProcessor::ParentNodeIDType(
+ core::PropertyBuilder::createProperty("Parent node ID type")
+ ->withDescription("Specifies the type of the provided node ID")
+ ->isRequired(true)
+ ->withAllowableValues<std::string>({"Path", "Int",
"String"})->build());
+
+ core::Property PutOPCProcessor::ParentNameSpaceIndex(
+ core::PropertyBuilder::createProperty("Parent node namespace index")
+ ->withDescription("The index of the namespace. Used only if node ID
type is not path.")
+ ->withDefaultValue<int32_t>(0)->build());
+
+ core::Property PutOPCProcessor::ValueType(
+ core::PropertyBuilder::createProperty("Value type")
+ ->withDescription("Set the OPC value type of the created nodes")
+ ->isRequired(true)->build());
+
+ core::Property PutOPCProcessor::TargetNodeIDType(
+ core::PropertyBuilder::createProperty("Target node ID type")
+ ->withDescription("ID type of target node. Allowed values are: Int,
String.")
+ ->supportsExpressionLanguage(true)->build());
+
+ core::Property PutOPCProcessor::TargetNodeID(
+ core::PropertyBuilder::createProperty("Target node ID")
+ ->withDescription("ID of target node.")
+ ->supportsExpressionLanguage(true)->build());
+
+ core::Property PutOPCProcessor::TargetNodeNameSpaceIndex(
+ core::PropertyBuilder::createProperty("Target node namespace index")
+ ->withDescription("The index of the namespace. Used only if node ID
type is not path.")
+ ->supportsExpressionLanguage(true)->build());
+
+ core::Property PutOPCProcessor::TargetNodeBrowseName(
+ core::PropertyBuilder::createProperty("Target node browse name")
+ ->withDescription("Browse name of target node. Only used when new
node is created.")
+ ->supportsExpressionLanguage(true)->build());
+
+ static core::Property TargetNodeID;
+ static core::Property TargetNodeBrowseName;
+
+
+ core::Relationship PutOPCProcessor::Success("success", "Successfully put
OPC-UA node");
+ core::Relationship PutOPCProcessor::Failure("failure", "Failed to put OPC-UA
node");
+
+ void PutOPCProcessor::initialize() {
+ PutOPCProcessor::ValueType.clearAllowedValues();
+ core::PropertyValue pv;
+ for(const auto& kv : opc::StringToOPCDataTypeMap) {
+ pv = kv.first;
+ PutOPCProcessor::ValueType.addAllowedValue(pv);
+ }
+ std::set<core::Property> putOPCProperties = {ParentNodeID,
ParentNodeIDType, ParentNameSpaceIndex, ValueType, TargetNodeIDType,
TargetNodeID, TargetNodeNameSpaceIndex, TargetNodeBrowseName};
+ std::set<core::Property> baseOPCProperties =
BaseOPCProcessor::getSupportedProperties();
+ putOPCProperties.insert(baseOPCProperties.begin(),
baseOPCProperties.end());
+ setSupportedProperties(putOPCProperties);
+
+ // Set the supported relationships
+ setSupportedRelationships({Success, Failure});
+ }
+
+ void PutOPCProcessor::onSchedule(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+ logger_->log_trace("PutOPCProcessor::onSchedule");
+
+ parentExists_ = false;
+
+ BaseOPCProcessor::onSchedule(context, factory);
+
+ if(!configOK_) {
+ return;
+ }
+
+ configOK_ = false;
+
+ context->getProperty(OPCServerEndPoint.getName(), endPointURL_);
+ std::string value;
+ context->getProperty(ParentNodeID.getName(), nodeID_);
+ context->getProperty(ParentNodeIDType.getName(), value);
+
+ if (value == "String") {
+ idType_ = opc::OPCNodeIDType::String;
+ } else if (value == "Int") {
+ idType_ = opc::OPCNodeIDType::Int;
+ } else if (value == "Path") {
+ idType_ = opc::OPCNodeIDType::Path;
+ } else {
+ // Where have our validators gone?
+ logger_->log_error("%s is not a valid node ID type!", value.c_str());
+ }
+
+ if(idType_ == opc::OPCNodeIDType::Int) {
+ try {
+ int t = std::stoi(nodeID_);
+ } catch(...) {
+ logger_->log_error("%s cannot be used as an int type node ID",
nodeID_.c_str());
+ return;
+ }
+ }
+ if(idType_ != opc::OPCNodeIDType::Path) {
+ if(!context->getProperty(ParentNameSpaceIndex.getName(), nameSpaceIdx_))
{
+ logger_->log_error("%s is mandatory in case %s is not Path",
ParentNameSpaceIndex.getName().c_str(), ParentNodeIDType.getName().c_str());
+ return;
+ }
+ }
+
+ std::string typestr;
+ context->getProperty(ValueType.getName(), typestr);
+ nodeDataType_ = opc::StringToOPCDataTypeMap.at(typestr); // This throws,
but allowed values are generated based on this map -> that's a really
unexpected error
+
+ configOK_ = true;
+ }
+
+ void PutOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext>
&context, const std::shared_ptr<core::ProcessSession> &session) {
+ if (!configOK_) {
+ logger_->log_error(
+ "This processor was not configured properly, yielding. Please check
for previous errors in the logs!");
+ yield();
+ return;
+ }
+
+ logger_->log_trace("PutOPCProcessor::onTrigger");
+
+ std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+ if (!lock.owns_lock()) {
+ logger_->log_warn("processor was triggered before previous listing
finished, configuration should be revised!");
+ return;
+ }
+
+ if (!reconnect()) {
+ yield();
+ return;
+ }
+
+ if (!parentExists_) {
+ if (idType_ == opc::OPCNodeIDType::Path) {
+ std::vector<UA_NodeId> translatedNodeIDs;
+ if (connection_->translateBrowsePathsToNodeIdsRequest(nodeID_,
translatedNodeIDs, logger_) !=
+ UA_STATUSCODE_GOOD) {
+ logger_->log_error("Failed to translate %s to node id, no flow files
will be put", nodeID_.c_str());
+ yield();
+ return;
+ } else if (translatedNodeIDs.size() != 1) {
+ logger_->log_error("%s was translated to multiple node ids, no flow
files will be put", nodeID_.c_str());
+ yield();
+ return;
+ } else {
+ parentNodeID_ = translatedNodeIDs[0];
+ parentExists_ = true;
+ }
+ } else {
+ parentNodeID_.namespaceIndex = nameSpaceIdx_;
+ if (idType_ == opc::OPCNodeIDType::Int) {
+ parentNodeID_.identifierType = UA_NODEIDTYPE_NUMERIC;
+ parentNodeID_.identifier.numeric = std::stoi(nodeID_);
+ } else if (idType_ == opc::OPCNodeIDType::String) {
+ parentNodeID_.identifierType = UA_NODEIDTYPE_STRING;
+ parentNodeID_.identifier.string = UA_STRING_ALLOC(nodeID_.c_str());
+ }
+ if (!connection_->exists(parentNodeID_)) {
+ logger_->log_error("Parent node doesn't exist, no flow files will be
put");
+ yield();
+ return;
+ }
+ parentExists_ = true;
+ }
+ }
+
+ std::shared_ptr<FlowFileRecord> flowFile =
std::static_pointer_cast<FlowFileRecord>(session->get());
+
+ // Do nothing if there are no incoming files
+ if (!flowFile) {
+ return;
+ }
+
+ std::string targetidtype;
+
+ bool targetNodeExists = false;
+ bool targetNodeValid = false;
+ UA_NodeId targetnode;
+
+ if (context->getProperty(TargetNodeIDType, targetidtype, flowFile)) {
+ std::string targetid;
+ std::string namespaceidx;
+
+
+ if (!context->getProperty(TargetNodeID, targetid, flowFile)) {
+ logger_->log_error("Flowfile %s had target node ID type specified (%s)
without ID, routing to failure!",
+ flowFile->getUUIDStr(), targetidtype);
+ session->transfer(flowFile, Failure);
+ return;
+ }
+
+ if (!context->getProperty(TargetNodeNameSpaceIndex, namespaceidx,
flowFile)) {
+ logger_->log_error(
+ "Flowfile %s had target node ID type specified (%s) without
namespace index, routing to failure!",
+ flowFile->getUUIDStr(), targetidtype);
+ session->transfer(flowFile, Failure);
+ return;
+ }
+ int32_t nsi;
+ try {
+ nsi = std::stoi(namespaceidx);
+ } catch (...) {
+ logger_->log_error("Flowfile %s has invalid namespace index (%s),
routing to failure!",
+ flowFile->getUUIDStr(), namespaceidx);
+ session->transfer(flowFile, Failure);
+ return;
+ }
+
+ targetnode.namespaceIndex = nsi;
+ if (targetidtype == "Int") {
+ targetnode.identifierType = UA_NODEIDTYPE_NUMERIC;
+ try {
+ targetnode.identifier.numeric = std::stoi(targetid);
+ targetNodeValid = true;
+ } catch (...) {
+ logger_->log_error("Flowfile %s: target node ID is not a valid
integer: %s. Routing to failure!",
+ flowFile->getUUIDStr(), targetid);
+ session->transfer(flowFile, Failure);
+ return;
+ }
+ } else if (targetidtype == "String") {
+ targetnode.identifierType = UA_NODEIDTYPE_STRING;
+ targetnode.identifier.string = UA_STRING_ALLOC(targetid.c_str());
+ targetNodeValid = true;
+ } else {
+ logger_->log_error("Flowfile %s: target node ID type is invalid: %s.
Routing to failure!",
+ flowFile->getUUIDStr(), targetidtype);
+ session->transfer(flowFile, Failure);
+ return;
+ }
+ targetNodeExists = connection_->exists(targetnode);
+ }
+
+ ReadCallback cb(logger_);
+ session->read(flowFile, &cb);
+
+ const auto &vec = cb.getContent();
+
+ std::string contentstr(reinterpret_cast<const char *>(vec.data()),
vec.size());
+
+ if (targetNodeExists) {
+ logger_->log_trace("Node exists, trying to update it");
+ try {
+ UA_StatusCode sc;
+ switch (nodeDataType_) {
+ case opc::OPCNodeDataType::Int64: {
+ int64_t value = std::stoll(contentstr);
+ sc = connection_->update_node(targetnode, value);
+ break;
+ }
+ case opc::OPCNodeDataType::UInt64: {
+ uint64_t value = std::stoull(contentstr);
+ sc = connection_->update_node(targetnode, value);
+ break;
+ }
+ case opc::OPCNodeDataType::Int32: {
+ int32_t value = std::stoi(contentstr);
+ sc = connection_->update_node(targetnode, value);
+ break;
+ }
+ case opc::OPCNodeDataType::UInt32: {
+ uint32_t value = std::stoul(contentstr);
+ sc = connection_->update_node(targetnode, value);
+ break;
+ }
+ case opc::OPCNodeDataType::Boolean: {
+ bool value;
+ if (utils::StringUtils::StringToBool(contentstr, value)) {
+ sc = connection_->update_node(targetnode, value);
+ } else {
+ throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be
converted to bool");
+ }
+ break;
+ }
+ case opc::OPCNodeDataType::Float: {
+ float value = std::stof(contentstr);
+ sc = connection_->update_node(targetnode, value);
+ break;
+ }
+ case opc::OPCNodeDataType::Double: {
+ double value = std::stod(contentstr);
+ sc = connection_->update_node(targetnode, value);
+ break;
+ }
+ case opc::OPCNodeDataType::String: {
+ sc = connection_->update_node(targetnode, contentstr);
+ break;
+ }
+ default:
+ throw opc::OPCException(GENERAL_EXCEPTION, "This should never
happen!");
+ }
+ if (sc != UA_STATUSCODE_GOOD) {
+ logger_->log_error("Failed to update node: %s",
UA_StatusCode_name(sc));
+ session->transfer(flowFile, Failure);
+ return;
+ }
+ } catch (...) {
+ std::string typestr;
+ context->getProperty(ValueType.getName(), typestr);
+ logger_->log_error("Failed to convert %s to data type %s", contentstr,
typestr);
+ session->transfer(flowFile, Failure);
+ return;
+ }
+ logger_->log_trace("Node successfully updated!");
+ session->transfer(flowFile, Success);
+ return;
+ } else {
+ logger_->log_trace("Node doesn't exist, trying to create new node");
+ std::string browsename;
+ if (!context->getProperty(TargetNodeBrowseName, browsename, flowFile)) {
+ logger_->log_error("Target node browse name is required for flowfile
(%s) as new node is to be created",
+ flowFile->getUUIDStr());
+ session->transfer(flowFile, Failure);
+ return;
+ }
+ if (!targetNodeValid) {
+ targetnode = UA_NODEID_NUMERIC(1, 0);
+ }
+ try {
+ UA_StatusCode sc;
+ UA_NodeId resultnode;
+ switch (nodeDataType_) {
+ case opc::OPCNodeDataType::Int64: {
+ int64_t value = std::stoll(contentstr);
+ sc = connection_->add_node(parentNodeID_, targetnode, browsename,
value, nodeDataType_, &resultnode);
+ break;
+ }
+ case opc::OPCNodeDataType::UInt64: {
+ uint64_t value = std::stoull(contentstr);
+ sc = connection_->add_node(parentNodeID_, targetnode, browsename,
value, nodeDataType_, &resultnode);
+ break;
+ }
+ case opc::OPCNodeDataType::Int32: {
+ int32_t value = std::stoi(contentstr);
+ sc = connection_->add_node(parentNodeID_, targetnode, browsename,
value, nodeDataType_, &resultnode);
+ break;
+ }
+ case opc::OPCNodeDataType::UInt32: {
+ uint32_t value = std::stoul(contentstr);
+ sc = connection_->add_node(parentNodeID_, targetnode, browsename,
value, nodeDataType_, &resultnode);
+ break;
+ }
+ case opc::OPCNodeDataType::Boolean: {
+ bool value;
+ if (utils::StringUtils::StringToBool(contentstr, value)) {
+ sc = connection_->add_node(parentNodeID_, targetnode,
browsename, value, nodeDataType_, &resultnode);
+ } else {
+ throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be
converted to bool");
+ }
+ break;
+ }
+ case opc::OPCNodeDataType::Float: {
+ float value = std::stof(contentstr);
+ sc = connection_->add_node(parentNodeID_, targetnode, browsename,
value, nodeDataType_, &resultnode);
+ break;
+ }
+ case opc::OPCNodeDataType::Double: {
+ double value = std::stod(contentstr);
+ sc = connection_->add_node(parentNodeID_, targetnode, browsename,
value, nodeDataType_, &resultnode);
+ break;
+ }
+ case opc::OPCNodeDataType::String: {
+ sc = connection_->add_node(parentNodeID_, targetnode, browsename,
contentstr, nodeDataType_, &resultnode);
+ break;
+ }
+ default:
+ throw opc::OPCException(GENERAL_EXCEPTION, "This should never
happen!");
+ }
+ if (sc != UA_STATUSCODE_GOOD) {
+ logger_->log_error("Failed to create node: %s",
UA_StatusCode_name(sc));
+ session->transfer(flowFile, Failure);
+ return;
+ }
+ } catch (...) {
+ std::string typestr;
+ context->getProperty(ValueType.getName(), typestr);
+ logger_->log_error("Failed to convert %s to data type %s", contentstr,
typestr);
+ session->transfer(flowFile, Failure);
+ return;
+ }
+ logger_->log_trace("Node successfully created!");
+ session->transfer(flowFile, Success);
+ return;
+ }
+ }
+
+ int64_t
PutOPCProcessor::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+ buf_.clear();
+ buf_.resize(stream->getSize());
+
+ uint64_t size = 0;
+
+ do {
+ int read = stream->read(buf_.data() + size, 1024);
+
+ if (read < 0) {
+ return -1;
+ }
+
+ if (read == 0) {
+ break;
+ }
+ size += read;
+ } while (size < stream->getSize());
+
+ logger_->log_trace("Read %llu bytes from flowfile content to buffer",
stream->getSize());
+
+ return size;
+ }
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/include/core/Property.h
b/libminifi/include/core/Property.h
index 2553c09..8921e59 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -174,6 +174,11 @@ class Property {
void addAllowedValue(const PropertyValue &value) {
allowed_values_.push_back(value);
}
+
+ void clearAllowedValues() {
+ allowed_values_.clear();
+ }
+
/**
* Add value to the collection of values.
*/
diff --git a/rheldistro.sh b/rheldistro.sh
index ec1396f..1c71e92 100644
--- a/rheldistro.sh
+++ b/rheldistro.sh
@@ -101,7 +101,7 @@ build_deps(){
install_bison
elif [ "$FOUND_VALUE" = "flex" ]; then
INSTALLED+=("flex")
- elif [ "$FOUND_VALUE" = "automake" ]; then
+ elif [ "$FOUND_VALUE" = "automake" ]; then
INSTALLED+=("automake")
elif [ "$FOUND_VALUE" = "autoconf" ]; then
INSTALLED+=("autoconf")
diff --git a/suse.sh b/suse.sh
index dceb710..7aff26f 100644
--- a/suse.sh
+++ b/suse.sh
@@ -92,7 +92,7 @@ build_deps(){
install_bison
elif [ "$FOUND_VALUE" = "flex" ]; then
INSTALLED+=("flex")
- elif [ "$FOUND_VALUE" = "automake" ]; then
+ elif [ "$FOUND_VALUE" = "automake" ]; then
INSTALLED+=("automake")
elif [ "$FOUND_VALUE" = "autoconf" ]; then
INSTALLED+=("autoconf")
diff --git a/thirdparty/open62541/open62541.patch
b/thirdparty/open62541/open62541.patch
new file mode 100644
index 0000000..b6cfaf4
--- /dev/null
+++ b/thirdparty/open62541/open62541.patch
@@ -0,0 +1,42 @@
+diff --git a/CMakeLists.txt b/CMakeLists.txt
+index d426e1da..5f1a4044 100644
+--- a/CMakeLists.txt
++++ b/CMakeLists.txt
+@@ -7,7 +7,7 @@ endif()
+
+ string(TOLOWER "${CMAKE_BUILD_TYPE}" BUILD_TYPE_LOWER_CASE)
+
+-set(CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/tools/cmake")
++list(APPEND CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/tools/cmake")
+ find_package(PythonInterp REQUIRED)
+ find_package(Git)
+ include(AssignSourceGroup)
+@@ -416,17 +416,17 @@ if(NOT UA_COMPILE_AS_CXX AND (CMAKE_COMPILER_IS_GNUCC OR
"x${CMAKE_C_COMPILER_ID
+
+ # IPO requires too much memory for unit tests
+ # GCC docu recommends to compile all files with the same options,
therefore ignore it completely
+- if(NOT UA_BUILD_UNIT_TESTS)
+- # needed to check if IPO is supported (check needs cmake > 3.9)
+- if("${CMAKE_VERSION}" VERSION_GREATER 3.9)
+- cmake_policy(SET CMP0069 NEW) # needed as long as required
cmake < 3.9
+- include(CheckIPOSupported)
+- check_ipo_supported(RESULT CC_HAS_IPO) # Inter Procedural
Optimization / Link Time Optimization (should be same as -flto)
+- if(CC_HAS_IPO)
+- set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ON)
+- endif()
+- endif()
+- endif()
++ #if(NOT UA_BUILD_UNIT_TESTS)
++ # # needed to check if IPO is supported (check needs cmake > 3.9)
++ # if("${CMAKE_VERSION}" VERSION_GREATER 3.9)
++ # cmake_policy(SET CMP0069 NEW) # needed as long as required
cmake < 3.9
++ # include(CheckIPOSupported)
++ # check_ipo_supported(RESULT CC_HAS_IPO) # Inter Procedural
Optimization / Link Time Optimization (should be same as -flto)
++ # if(CC_HAS_IPO)
++ # set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ON)
++ # endif()
++ # endif()
++ #endif()
+ endif()
+
+ if(UA_ENABLE_AMALGAMATION)