This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new f0a96c9  MINIFICPP-819 - OPC Unified Architecture Support
f0a96c9 is described below

commit f0a96c99afd16a6686466d5edea3d59fda67d1fe
Author: Arpad Boda <[email protected]>
AuthorDate: Wed Jul 10 11:43:44 2019 +0200

    MINIFICPP-819 - OPC Unified Architecture Support
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    Approved by bakaid on GH
    
    This closes #635
---
 CMakeLists.txt                        |  12 +
 bootstrap.sh                          |  18 +-
 bstrp_functions.sh                    |   2 +
 cmake/BundledMbedTLS.cmake            | 104 +++++++
 cmake/BundledOpen62541.cmake          |  83 +++++
 cmake/mbedtls/dummy/FindMbedTLS.cmake |  25 ++
 extensions/opc/CMakeLists.txt         |  59 ++++
 extensions/opc/include/fetchopc.h     | 109 +++++++
 extensions/opc/include/opc.h          | 146 +++++++++
 extensions/opc/include/opcbase.h      |  85 +++++
 extensions/opc/include/putopc.h       | 111 +++++++
 extensions/opc/src/fetchopc.cpp       | 235 ++++++++++++++
 extensions/opc/src/opc.cpp            | 567 ++++++++++++++++++++++++++++++++++
 extensions/opc/src/opcbase.cpp        | 162 ++++++++++
 extensions/opc/src/putopc.cpp         | 466 ++++++++++++++++++++++++++++
 libminifi/include/core/Property.h     |   5 +
 rheldistro.sh                         |   2 +-
 suse.sh                               |   2 +-
 thirdparty/open62541/open62541.patch  |  42 +++
 19 files changed, 2225 insertions(+), 10 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index ec02734..4d26dd0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -40,6 +40,7 @@ option(ENABLE_OPS "Enable Operations/zlib Tools" ON)
 option(USE_SYSTEM_UUID "Instructs the build system to search for and use a 
UUID library available in the host system" OFF)
 option(ENABLE_JNI "Instructs the build system to enable the JNI extension" OFF)
 option(ENABLE_OPENCV "Instructs the build system to enable the OpenCV 
extension" OFF)
+option(ENABLE_OPC "Instructs the build system to enable the OPC extension" OFF)
 option(BUILD_SHARED_LIBS "Build yaml cpp shared lib" OFF)
 
 cmake_dependent_option(USE_SYSTEM_ZLIB "Instructs the build system to search 
for and use a zlib library available in the host system" ON "NOT STATIC_BUILD" 
OFF)
@@ -560,6 +561,17 @@ if (ENABLE_BUSTACHE)
     createExtension(BUSTACHE-EXTENSIONS "BUSTACHE EXTENSIONS" "This enables 
bustache functionality including ApplyTemplate." "extensions/bustache" 
"${TEST_DIR}/bustache-tests" "TRUE" "thirdparty/bustache")
 endif()
 
+## OPC Extentions
+if (ENABLE_OPC)
+       include(BundledMbedTLS)
+       use_bundled_mbedtls(${CMAKE_CURRENT_SOURCE_DIR} 
${CMAKE_CURRENT_BINARY_DIR})
+
+       include(BundledOpen62541)
+       use_bundled_open62541(${CMAKE_CURRENT_SOURCE_DIR} 
${CMAKE_CURRENT_BINARY_DIR})
+
+       createExtension(OPC-EXTENSIONS "OPC EXTENSIONS" "This enables OPC-UA 
support" "extensions/opc" )
+endif()
+
 ## SFTP extensions
 option(ENABLE_SFTP "Enables SFTP support." OFF)
 if ((ENABLE_ALL OR ENABLE_SFTP) AND NOT DISABLE_CURL)
diff --git a/bootstrap.sh b/bootstrap.sh
index 8d8fc77..0224490 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -290,23 +290,25 @@ add_disabled_option OPENCV_ENABLED ${FALSE} 
"ENABLE_OPENCV"
 add_disabled_option SFTP_ENABLED ${FALSE} "ENABLE_SFTP"
 add_dependency SFTP_ENABLED "libssh2"
 
-TESTS_DISABLED=${FALSE}
-
 add_disabled_option SQLITE_ENABLED ${FALSE} "ENABLE_SQLITE"
 
-USE_SHARED_LIBS=${TRUE}
-
-## name, default, values
-add_multi_option BUILD_PROFILE "RelWithDebInfo" "RelWithDebInfo" "Debug" 
"MinSizeRel" "Release"
-
 # Since the following extensions have limitations on
-
 add_disabled_option BUSTACHE_ENABLED ${FALSE} "ENABLE_BUSTACHE" "2.6" ${TRUE}
 add_dependency BUSTACHE_ENABLED "boost"
+
 ## currently need to limit on certain platforms
 add_disabled_option TENSORFLOW_ENABLED ${FALSE} "ENABLE_TENSORFLOW" "2.6" 
${TRUE}
 add_dependency TENSORFLOW_ENABLED "tensorflow"
 
+add_disabled_option OPC_ENABLED ${FALSE} "ENABLE_OPC"
+add_dependency OPC_ENABLED "mbedtls"
+
+USE_SHARED_LIBS=${TRUE}
+TESTS_DISABLED=${FALSE}
+
+## name, default, values
+add_multi_option BUILD_PROFILE "RelWithDebInfo" "RelWithDebInfo" "Debug" 
"MinSizeRel" "Release"
+
 if [ "$GUIDED_INSTALL" == "${TRUE}" ]; then
   EnableAllFeatures
   ALL_FEATURES_ENABLED=${TRUE}
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index 4f34fc9..74f079a 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -325,6 +325,7 @@ show_supported_features() {
   echo "S. SFTP Support ................$(print_feature_status SFTP_ENABLED)"
   echo "V. AWS Support .................$(print_feature_status AWS_ENABLED)"
   echo "T. OpenCV Support ..............$(print_feature_status OPENCV_ENABLED)"
+  echo "U. OPC-UA Support...............$(print_feature_status OPC_ENABLED)"
   echo "****************************************"
   echo "            Build Options."
   echo "****************************************"
@@ -370,6 +371,7 @@ read_feature_options(){
     o) ToggleFeature COAP_ENABLED ;;
        s) ToggleFeature SFTP_ENABLED ;;
     t) ToggleFeature OPENCV_ENABLED ;;
+    u) ToggleFeature OPC_ENABLED ;;
     1) ToggleFeature TESTS_DISABLED ;;
     2) EnableAllFeatures ;;
     3) ToggleFeature JNI_ENABLED;;
diff --git a/cmake/BundledMbedTLS.cmake b/cmake/BundledMbedTLS.cmake
new file mode 100644
index 0000000..9407aed
--- /dev/null
+++ b/cmake/BundledMbedTLS.cmake
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+function(use_bundled_mbedtls SOURCE_DIR BINARY_DIR)
+    message("Using bundled MbedTLS")
+
+    # Define byproducts
+    if (WIN32)
+        set(BYPRODUCT_PREFIX "" CACHE STRING "" FORCE)
+        set(BYPRODUCT_SUFFIX ".lib" CACHE STRING "" FORCE)
+    else()
+        set(BYPRODUCT_PREFIX "lib" CACHE STRING "" FORCE)
+        set(BYPRODUCT_SUFFIX ".a" CACHE STRING "" FORCE)
+    endif()
+
+    set(BYPRODUCTS
+            "lib/${BYPRODUCT_PREFIX}mbedtls${BYPRODUCT_SUFFIX}"
+            "lib/${BYPRODUCT_PREFIX}mbedx509${BYPRODUCT_SUFFIX}"
+            "lib/${BYPRODUCT_PREFIX}mbedcrypto${BYPRODUCT_SUFFIX}"
+            )
+
+    set(MBEDTLS_BIN_DIR "${BINARY_DIR}/thirdparty/mbedtls-install" CACHE 
STRING "" FORCE)
+
+    FOREACH(BYPRODUCT ${BYPRODUCTS})
+        LIST(APPEND MBEDTLS_LIBRARIES_LIST "${MBEDTLS_BIN_DIR}/${BYPRODUCT}")
+    ENDFOREACH(BYPRODUCT)
+
+    # Set build options
+    set(MBEDTLS_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
+            "-DCMAKE_INSTALL_PREFIX=${MBEDTLS_BIN_DIR}"
+            -DENABLE_PROGRAMS=OFF
+            -DENABLE_TESTING=OFF
+            )
+
+    # Build project
+    ExternalProject_Add(
+            mbedtls-external
+            URL "https://tls.mbed.org/download/mbedtls-2.16.3-apache.tgz";
+            URL_HASH 
"SHA256=ec1bee6d82090ed6ea2690784ea4b294ab576a65d428da9fe8750f932d2da661"
+            SOURCE_DIR "${BINARY_DIR}/thirdparty/mbedtls-src"
+            CMAKE_ARGS ${MBEDTLS_CMAKE_ARGS}
+            BUILD_BYPRODUCTS ${MBEDTLS_LIBRARIES_LIST}
+            EXCLUDE_FROM_ALL TRUE
+    )
+
+    # Set variables
+    set(MBEDTLS_FOUND "YES" CACHE STRING "" FORCE)
+    set(MBEDTLS_INCLUDE_DIRS "${MBEDTLS_BIN_DIR}/include" CACHE STRING "" 
FORCE)
+    set(MBEDTLS_LIBRARIES ${MBEDTLS_LIBRARIES_LIST} CACHE STRING "" FORCE)
+    set(MBEDTLS_LIBRARY 
"${MBEDTLS_BIN_DIR}/lib/${BYPRODUCT_PREFIX}mbedtls${BYPRODUCT_SUFFIX}" CACHE 
STRING "" FORCE)
+    set(MBEDX509_LIBRARY 
"${MBEDTLS_BIN_DIR}/lib/${BYPRODUCT_PREFIX}mbedx509${BYPRODUCT_SUFFIX}" CACHE 
STRING "" FORCE)
+    set(MBEDCRYPTO_LIBRARY 
"${MBEDTLS_BIN_DIR}/lib/${BYPRODUCT_PREFIX}mbedcrypto${BYPRODUCT_SUFFIX}" CACHE 
STRING "" FORCE)
+
+    # Set exported variables for FindPackage.cmake
+    set(EXPORTED_MBEDTLS_INCLUDE_DIRS "${MBEDTLS_INCLUDE_DIRS}" CACHE STRING 
"" FORCE)
+    string(REPLACE ";" "%" MBEDTLS_LIBRARIES_EXPORT "${MBEDTLS_LIBRARIES}")
+    set(EXPORTED_MBEDTLS_LIBRARIES "${MBEDTLS_LIBRARIES_EXPORT}" CACHE STRING 
"" FORCE)
+    set(EXPORTED_MBEDTLS_LIBRARY "${MBEDTLS_LIBRARY}" CACHE STRING "" FORCE)
+    set(EXPORTED_MBEDX509_LIBRARY "${MBEDX509_LIBRARY}" CACHE STRING "" FORCE)
+    set(EXPORTED_MBEDCRYPTO_LIBRARY "${MBEDCRYPTO_LIBRARY}" CACHE STRING "" 
FORCE)
+
+    # Create imported targets
+    file(MAKE_DIRECTORY ${MBEDTLS_INCLUDE_DIRS})
+
+    add_library(mbedTLS::mbedcrypto STATIC IMPORTED)
+    set_target_properties(mbedTLS::mbedcrypto PROPERTIES
+            INTERFACE_INCLUDE_DIRECTORIES "${MBEDTLS_INCLUDE_DIRS}")
+    set_target_properties(mbedTLS::mbedcrypto PROPERTIES
+            IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+            IMPORTED_LOCATION "${MBEDCRYPTO_LIBRARY}")
+    add_dependencies(mbedTLS::mbedcrypto mbedtls-external)
+
+    add_library(mbedTLS::mbedx509 STATIC IMPORTED)
+    set_target_properties(mbedTLS::mbedx509 PROPERTIES
+            INTERFACE_INCLUDE_DIRECTORIES "${MBEDTLS_INCLUDE_DIRS}")
+    set_target_properties(mbedTLS::mbedx509 PROPERTIES
+            IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+            IMPORTED_LOCATION "${MBEDX509_LIBRARY}")
+    add_dependencies(mbedTLS::mbedx509 mbedtls-external)
+    set_property(TARGET mbedTLS::mbedx509 APPEND PROPERTY 
INTERFACE_LINK_LIBRARIES mbedTLS::mbedcrypto)
+
+    add_library(mbedTLS::mbedtls STATIC IMPORTED)
+    set_target_properties(mbedTLS::mbedtls PROPERTIES
+            INTERFACE_INCLUDE_DIRECTORIES "${MBEDTLS_INCLUDE_DIRS}")
+    set_target_properties(mbedTLS::mbedtls PROPERTIES
+            IMPORTED_LINK_INTERFACE_LANGUAGES "C"
+            IMPORTED_LOCATION "${MBEDTLS_LIBRARY}")
+    add_dependencies(mbedTLS::mbedtls mbedtls-external)
+    set_property(TARGET mbedTLS::mbedtls APPEND PROPERTY 
INTERFACE_LINK_LIBRARIES mbedTLS::mbedx509 mbedTLS::mbedcrypto)
+endfunction(use_bundled_mbedtls)
\ No newline at end of file
diff --git a/cmake/BundledOpen62541.cmake b/cmake/BundledOpen62541.cmake
new file mode 100644
index 0000000..b4fb2de
--- /dev/null
+++ b/cmake/BundledOpen62541.cmake
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+function(use_bundled_open62541 SOURCE_DIR BINARY_DIR)
+    # Find patch executable
+    find_package(Patch REQUIRED)
+
+    # Define patch step
+    set(PC "${Patch_EXECUTABLE}" -p1 -i 
"${SOURCE_DIR}/thirdparty/open62541/open62541.patch")
+
+    # Define byproducts
+    get_property(LIB64 GLOBAL PROPERTY FIND_LIBRARY_USE_LIB64_PATHS)
+
+    if ("${LIB64}" STREQUAL "TRUE" AND (NOT WIN32 AND NOT APPLE))
+        set(LIBSUFFIX 64)
+    endif()
+
+    if (WIN32)
+        set(BYPRODUCT "lib/open62541.lib")
+    else()
+        set(BYPRODUCT "lib${LIBSUFFIX}/libopen62541.a")
+    endif()
+
+    # Set build options
+    set(OPEN62541_BYPRODUCT_DIR "${BINARY_DIR}/thirdparty/open62541-install")
+
+    set(OPEN62541_CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS}
+            "-DCMAKE_INSTALL_PREFIX=${OPEN62541_BYPRODUCT_DIR}"
+            -DOPEN62541_VERSION=v1.0
+            -DUA_ENABLE_ENCRYPTION=ON
+            "-DCMAKE_MODULE_PATH=${CMAKE_SOURCE_DIR}/cmake/mbedtls/dummy"
+            "-DEXPORTED_MBEDTLS_INCLUDE_DIRS=${EXPORTED_MBEDTLS_INCLUDE_DIRS}"
+            "-DEXPORTED_MBEDTLS_LIBRARIES=${EXPORTED_MBEDTLS_LIBRARIES}"
+            "-DEXPORTED_MBEDTLS_LIBRARY=${EXPORTED_MBEDTLS_LIBRARY}"
+            "-DEXPORTED_MBEDX509_LIBRARY=${EXPORTED_MBEDX509_LIBRARY}"
+            "-DEXPORTED_MBEDCRYPTO_LIBRARY=${EXPORTED_MBEDCRYPTO_LIBRARY}")
+
+    # Build project
+    ExternalProject_Add(
+            open62541-external
+            URL "https://github.com/open62541/open62541/archive/v1.0.tar.gz";
+            URL_HASH 
"SHA256=9be66efefe2cdb07a7638aad91c301b5c6163f99c66995bc41cce31ec0ea207e"
+            SOURCE_DIR "${BINARY_DIR}/thirdparty/open62541-src"
+            PATCH_COMMAND ${PC}
+            LIST_SEPARATOR % # This is needed for passing semicolon-separated 
lists
+            CMAKE_ARGS ${OPEN62541_CMAKE_ARGS}
+            BUILD_BYPRODUCTS "${OPEN62541_BYPRODUCT_DIR}/${BYPRODUCT}"
+            EXCLUDE_FROM_ALL TRUE
+    )
+
+    # Set dependencies
+    add_dependencies(open62541-external mbedTLS::mbedtls)
+
+    # Set variables
+    set(OPEN62541_FOUND "YES" CACHE STRING "" FORCE)
+    set(OPEN62541_INCLUDE_DIR "${OPEN62541_BYPRODUCT_DIR}/include" CACHE 
STRING "" FORCE)
+    set(OPEN62541_LIBRARY "${OPEN62541_BYPRODUCT_DIR}/${BYPRODUCT}" CACHE 
STRING "" FORCE)
+
+    # Create imported targets
+    add_library(open62541::open62541 STATIC IMPORTED)
+    set_target_properties(open62541::open62541 PROPERTIES IMPORTED_LOCATION 
"${OPEN62541_LIBRARY}")
+    add_dependencies(open62541::open62541 open62541-external)
+    file(MAKE_DIRECTORY ${OPEN62541_INCLUDE_DIR})
+    set_property(TARGET open62541::open62541 APPEND PROPERTY 
INTERFACE_INCLUDE_DIRECTORIES ${OPEN62541_INCLUDE_DIR})
+    set_property(TARGET open62541::open62541 APPEND PROPERTY 
INTERFACE_LINK_LIBRARIES mbedTLS::mbedtls)
+    if(WIN32)
+        set_property(TARGET open62541::open62541 APPEND PROPERTY 
INTERFACE_LINK_LIBRARIES Ws2_32.lib Iphlpapi.lib)
+    endif()
+endfunction(use_bundled_open62541)
\ No newline at end of file
diff --git a/cmake/mbedtls/dummy/FindMbedTLS.cmake 
b/cmake/mbedtls/dummy/FindMbedTLS.cmake
new file mode 100644
index 0000000..750d465
--- /dev/null
+++ b/cmake/mbedtls/dummy/FindMbedTLS.cmake
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+if(NOT MBEDTLS_FOUND)
+    set(MBEDTLS_FOUND "YES" CACHE STRING "" FORCE)
+    set(MBEDTLS_INCLUDE_DIRS "${EXPORTED_MBEDTLS_INCLUDE_DIRS}" CACHE STRING 
"" FORCE)
+    set(MBEDTLS_LIBRARIES ${EXPORTED_MBEDTLS_LIBRARIES} CACHE STRING "" FORCE)
+    set(MBEDTLS_LIBRARY "${EXPORTED_MBEDTLS_LIBRARY}" CACHE STRING "" FORCE)
+    set(MBEDX509_LIBRARY "${EXPORTED_MBEDX509_LIBRARY}" CACHE STRING "" FORCE)
+    set(MBEDCRYPTO_LIBRARY "${EXPORTED_MBEDCRYPTO_LIBRARY}" CACHE STRING "" 
FORCE)
+endif()
\ No newline at end of file
diff --git a/extensions/opc/CMakeLists.txt b/extensions/opc/CMakeLists.txt
new file mode 100644
index 0000000..9a77ca1
--- /dev/null
+++ b/extensions/opc/CMakeLists.txt
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+set( CMAKE_VERBOSE_MAKEFILE on )
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+include_directories(include)
+
+add_definitions(-DUA_ENABLE_ENCRYPTION)
+
+file(GLOB SOURCES "src/*.cpp")
+
+add_library(minifi-opc-extensions STATIC ${SOURCES})
+set_property(TARGET minifi-opc-extensions PROPERTY POSITION_INDEPENDENT_CODE 
ON)
+
+if(THREADS_HAVE_PTHREAD_ARG)
+  target_compile_options(PUBLIC minifi-opc-extensions "-pthread")
+endif()
+if(CMAKE_THREAD_LIBS_INIT)
+  target_link_libraries(minifi-opc-extensions "${CMAKE_THREAD_LIBS_INIT}")
+endif()
+
+target_link_libraries(minifi-opc-extensions ${CMAKE_DL_LIBS} 
open62541::open62541)
+
+if (WIN32)
+    set_target_properties(minifi-opc-extensions PROPERTIES
+        LINK_FLAGS "/WHOLEARCHIVE"
+    )
+elseif (APPLE)
+    set_target_properties(minifi-opc-extensions PROPERTIES
+        LINK_FLAGS "-Wl,-all_load"
+    )
+else ()
+    set_target_properties(minifi-opc-extensions PROPERTIES
+        LINK_FLAGS "-Wl,--whole-archive"
+    )
+endif ()
+
+
+SET (OPC-EXTENSIONS minifi-opc-extensions PARENT_SCOPE)
+
+register_extension(minifi-opc-extensions)
diff --git a/extensions/opc/include/fetchopc.h 
b/extensions/opc/include/fetchopc.h
new file mode 100644
index 0000000..dc6fbc3
--- /dev/null
+++ b/extensions/opc/include/fetchopc.h
@@ -0,0 +1,109 @@
+/**
+ * FetchOPC class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef NIFI_MINIFI_CPP_FetchOPCProcessor_H
+#define NIFI_MINIFI_CPP_FetchOPCProcessor_H
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include "opc.h"
+#include "opcbase.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class FetchOPCProcessor : public BaseOPCProcessor {
+public:
+  static constexpr char const* ProcessorName = "FetchOPC";
+  // Supported Properties
+  static core::Property NodeIDType;
+  static core::Property NodeID;
+  static core::Property NameSpaceIndex;
+  static core::Property MaxDepth;
+
+  // Supported Relationships
+  static core::Relationship Success;
+  static core::Relationship Failure;
+
+  FetchOPCProcessor(std::string name, utils::Identifier uuid = 
utils::Identifier())
+  : BaseOPCProcessor(name, uuid), nameSpaceIdx_(0), nodesFound_(0), 
variablesFound_(0), maxDepth_(0) {
+    logger_ = logging::LoggerFactory<FetchOPCProcessor>::getLogger();
+  }
+
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
+
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) override;
+
+  virtual void initialize(void) override;
+
+protected:
+  bool nodeFoundCallBack(opc::Client& client, const UA_ReferenceDescription 
*ref, const std::string& path,
+                         const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session);
+
+  void OPCData2FlowFile(const opc::NodeData& opcnode, const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session);
+
+  class WriteCallback : public OutputStreamCallback {
+    std::string data_;
+   public:
+    WriteCallback(std::string&& data)
+      : data_(data) {
+    }
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      return 
stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), 
data_.size());
+    }
+  };
+  std::string nodeID_;
+  int32_t nameSpaceIdx_;
+  opc::OPCNodeIDType idType_;
+  uint32_t nodesFound_;
+  uint32_t variablesFound_;
+  uint64_t maxDepth_;
+
+private:
+  std::mutex onTriggerMutex_;
+  std::vector<UA_NodeId> translatedNodeIDs_;  // Only used when user provides 
path, path->nodeid translation is only done once
+
+};
+
+REGISTER_RESOURCE(FetchOPCProcessor, "Fetches OPC-UA node");
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_FetchOPCProcessor_H
diff --git a/extensions/opc/include/opc.h b/extensions/opc/include/opc.h
new file mode 100644
index 0000000..0a954de
--- /dev/null
+++ b/extensions/opc/include/opc.h
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef NIFI_MINIFI_CPP_OPC_H
+#define NIFI_MINIFI_CPP_OPC_H
+
+#include "open62541/client.h"
+#include "open62541/client_highlevel.h"
+#include "open62541/client_config_default.h"
+#include "logging/Logger.h"
+#include "Exception.h"
+
+#include <string>
+#include <functional>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace opc {
+
+class OPCException : public minifi::Exception {
+ public:
+  OPCException(ExceptionType type, std::string &&errorMsg)
+  : Exception(type, errorMsg) {
+  }
+};
+
+enum class OPCNodeIDType{ Path, Int, String };
+
+enum class OPCNodeDataType{ Int64, UInt64, Int32, UInt32, Boolean, Float, 
Double, String };
+
+struct NodeData;
+
+class Client;
+
+using nodeFoundCallBackFunc = bool(Client& client, const 
UA_ReferenceDescription*, const std::string&);
+
+class Client {
+ public:
+  bool isConnected();
+  UA_StatusCode connect(const std::string& url, const std::string& username = 
"", const std::string& password = "");
+  ~Client();
+  NodeData getNodeData(const UA_ReferenceDescription *ref, const std::string& 
basePath = "");
+  UA_ReferenceDescription * getNodeReference(UA_NodeId nodeId);
+  void traverse(UA_NodeId nodeId, std::function<nodeFoundCallBackFunc> cb, 
const std::string& basePath = "", uint64_t maxDepth = 0, bool fetchRoot = true);
+  bool exists(UA_NodeId nodeId);
+  UA_StatusCode translateBrowsePathsToNodeIdsRequest(const std::string& path, 
std::vector<UA_NodeId>& foundNodeIDs, const 
std::shared_ptr<core::logging::Logger>& logger);
+
+  template<typename T>
+  UA_StatusCode update_node(const UA_NodeId nodeId, T value);
+
+  template<typename T>
+  UA_StatusCode add_node(const UA_NodeId parentNodeId, const UA_NodeId 
targetNodeId, std::string browseName, T value, OPCNodeDataType dt, UA_NodeId 
*receivedNodeId);
+
+  static std::unique_ptr<Client> 
createClient(std::shared_ptr<core::logging::Logger> logger, const std::string& 
applicationURI,
+                                              const std::vector<char>& 
certBuffer, const std::vector<char>& keyBuffer,
+                                              const 
std::vector<std::vector<char>>& trustBuffers);
+
+ private:
+  Client (std::shared_ptr<core::logging::Logger> logger, const std::string& 
applicationURI,
+      const std::vector<char>& certBuffer, const std::vector<char>& keyBuffer,
+      const std::vector<std::vector<char>>& trustBuffers);
+
+  UA_Client *client_;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+using ClientPtr = std::unique_ptr<Client>;
+
+struct NodeData {
+  std::vector<uint8_t> data;
+  uint16_t dataTypeID;
+  std::map<std::string, std::string> attributes;
+
+  virtual ~NodeData(){
+    if(var_) {
+      UA_Variant_delete(var_);
+    }
+  }
+
+  NodeData (const NodeData&) = delete;
+  NodeData& operator= (const NodeData &) = delete;
+  NodeData& operator= (NodeData &&) = delete;
+
+  NodeData(NodeData&& rhs) : data(rhs.data), attributes(rhs.attributes)
+  {
+    dataTypeID = rhs.dataTypeID;
+    this->var_ = rhs.var_;
+    rhs.var_ = nullptr;
+  }
+
+ private:
+  UA_Variant* var_;
+
+  NodeData(UA_Variant * var = nullptr) {
+    var_ = var;
+  }
+  void addVariant(UA_Variant * var) {
+    if(var_) {
+      UA_Variant_delete(var_);
+    }
+    var_ = var;
+  }
+
+  friend class Client;
+  friend std::string nodeValue2String(const NodeData&);
+};
+
+static std::map<std::string, OPCNodeDataType>  StringToOPCDataTypeMap = 
{{"Int64", OPCNodeDataType::Int64}, {"UInt64", OPCNodeDataType::UInt64 }, 
{"Int32", OPCNodeDataType::Int32},
+                                                                         
{"UInt32", OPCNodeDataType::UInt32}, {"Boolean", OPCNodeDataType::Boolean}, 
{"Float", OPCNodeDataType::Float},
+                                                                         
{"Double", OPCNodeDataType::Double}, {"String", OPCNodeDataType::String}};
+
+int32_t OPCNodeDataTypeToTypeID(OPCNodeDataType dt);
+
+std::string nodeValue2String(const NodeData& nd);
+
+std::string OPCDateTime2String(UA_DateTime raw_date);
+
+void logFunc(void *context, UA_LogLevel level, UA_LogCategory category, const 
char *msg, va_list args);
+
+static void logClear(void *context) {};
+
+} /* namespace opc */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
+#endif //NIFI_MINIFI_CPP_OPC_H
diff --git a/extensions/opc/include/opcbase.h b/extensions/opc/include/opcbase.h
new file mode 100644
index 0000000..2ad4b03
--- /dev/null
+++ b/extensions/opc/include/opcbase.h
@@ -0,0 +1,85 @@
+/**
+ * OPCBase class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_OPCBASE_H
+#define NIFI_MINIFI_CPP_OPCBASE_H
+
+#include <string>
+
+#include "opc.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class BaseOPCProcessor : public core::Processor {
+ public:
+  static core::Property OPCServerEndPoint;
+
+  static core::Property ApplicationURI;
+  static core::Property Username;
+  static core::Property Password;
+  static core::Property CertificatePath;
+  static core::Property KeyPath;
+  static core::Property TrustedPath;
+
+  BaseOPCProcessor(std::string name, utils::Identifier uuid = 
utils::Identifier())
+  : Processor(name, uuid) {
+  }
+
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
+
+ protected:
+  virtual bool reconnect();
+
+  std::shared_ptr<logging::Logger> logger_;
+
+  opc::ClientPtr connection_;
+
+  std::string endPointURL_;
+
+  std::string applicationURI_;
+  std::string username_;
+  std::string password_;
+  std::string certpath_;
+  std::string keypath_;
+  std::string trustpath_;
+
+  std::vector<char> certBuffer_;
+  std::vector<char> keyBuffer_;
+  std::vector<std::vector<char>> trustBuffers_;
+
+  bool configOK_;
+
+  virtual std::set<core::Property> getSupportedProperties() const {return 
{OPCServerEndPoint, ApplicationURI, Username, Password, CertificatePath, 
KeyPath, TrustedPath};}
+};
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif //NIFI_MINIFI_CPP_OPCBASE_H
diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h
new file mode 100644
index 0000000..8dfe61c
--- /dev/null
+++ b/extensions/opc/include/putopc.h
@@ -0,0 +1,111 @@
+/**
+ * PutOPC class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NIFI_MINIFI_CPP_PUTOPC_H
+#define NIFI_MINIFI_CPP_PUTOPC_H
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include "opc.h"
+#include "opcbase.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class PutOPCProcessor : public BaseOPCProcessor {
+ public:
+  static constexpr char const* ProcessorName = "PutOPC";
+  // Supported Properties
+  static core::Property ParentNodeIDType;
+  static core::Property ParentNodeID;
+  static core::Property ParentNameSpaceIndex;
+  static core::Property ValueType;
+
+  static core::Property TargetNodeIDType;
+  static core::Property TargetNodeID;
+  static core::Property TargetNodeBrowseName;
+  static core::Property TargetNodeNameSpaceIndex;
+
+  // Supported Relationships
+  static core::Relationship Success;
+  static core::Relationship Failure;
+
+  PutOPCProcessor(std::string name, utils::Identifier uuid = 
utils::Identifier())
+  : BaseOPCProcessor(name, uuid), nameSpaceIdx_(0), parentExists_(false) {
+    logger_ = logging::LoggerFactory<PutOPCProcessor>::getLogger();
+  }
+
+  virtual void onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
+
+  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) override;
+
+  virtual void initialize(void) override;
+
+ private:
+
+  class ReadCallback : public InputStreamCallback {
+  public:
+    ReadCallback(std::shared_ptr<logging::Logger> logger) : logger_(logger) {}
+    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    const std::vector<uint8_t>& getContent() const { return buf_; }
+
+  private:
+    std::vector<uint8_t> buf_;
+    std::shared_ptr<logging::Logger> logger_;
+  };
+
+  std::mutex onTriggerMutex_;
+
+  std::string nodeID_;
+  int32_t nameSpaceIdx_;
+  opc::OPCNodeIDType idType_;
+  UA_NodeId parentNodeID_;
+
+  bool parentExists_;
+
+  opc::OPCNodeDataType nodeDataType_;
+};
+
+REGISTER_RESOURCE(PutOPCProcessor, "Creates/updates  OPC nodes");
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif  // NIFI_MINIFI_CPP_PUTOPC_H
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
new file mode 100644
index 0000000..4efc202
--- /dev/null
+++ b/extensions/opc/src/fetchopc.cpp
@@ -0,0 +1,235 @@
+/**
+ * FetchOPC class definition
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include "opc.h"
+#include "fetchopc.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+  core::Property FetchOPCProcessor::NodeID(
+      core::PropertyBuilder::createProperty("Node ID")
+      ->withDescription("Specifies the ID of the root node to traverse")
+      ->isRequired(true)->build());
+
+
+  core::Property FetchOPCProcessor::NodeIDType(
+      core::PropertyBuilder::createProperty("Node ID type")
+      ->withDescription("Specifies the type of the provided node ID")
+      ->isRequired(true)
+      ->withAllowableValues<std::string>({"Path", "Int", "String"})->build());
+
+  core::Property FetchOPCProcessor::NameSpaceIndex(
+      core::PropertyBuilder::createProperty("Namespace index")
+      ->withDescription("The index of the namespace. Used only if node ID type 
is not path.")
+      ->withDefaultValue<int32_t>(0)->build());
+
+  core::Property FetchOPCProcessor::MaxDepth(
+      core::PropertyBuilder::createProperty("Max depth")
+      ->withDescription("Specifiec the max depth of browsing. 0 means 
unlimited.")
+      ->withDefaultValue<uint64_t>(0)->build());
+
+  core::Relationship FetchOPCProcessor::Success("success", "Successfully 
retrieved OPC-UA nodes");
+  core::Relationship FetchOPCProcessor::Failure("failure", "Retrieved OPC-UA 
nodes where value cannot be extracted (only if enabled)");
+
+
+  void FetchOPCProcessor::initialize() {
+    // Set the supported properties
+    std::set<core::Property> fetchOPCProperties = {OPCServerEndPoint, NodeID, 
NodeIDType, NameSpaceIndex, MaxDepth};
+    std::set<core::Property> baseOPCProperties = 
BaseOPCProcessor::getSupportedProperties();
+    fetchOPCProperties.insert(baseOPCProperties.begin(), 
baseOPCProperties.end());
+    setSupportedProperties(fetchOPCProperties);
+
+    // Set the supported relationships
+    setSupportedRelationships({Success, Failure});
+  }
+
+  void FetchOPCProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &factory) {
+    logger_->log_trace("FetchOPCProcessor::onSchedule");
+
+    translatedNodeIDs_.clear();  // Path might has changed during restart
+
+    BaseOPCProcessor::onSchedule(context, factory);
+
+    if(!configOK_) {
+      return;
+    }
+
+    configOK_ = false;
+
+    std::string value;
+    context->getProperty(NodeID.getName(), nodeID_);
+    context->getProperty(NodeIDType.getName(), value);
+
+    maxDepth_ = 0;
+    context->getProperty(MaxDepth.getName(), maxDepth_);
+
+    if (value == "String") {
+      idType_ = opc::OPCNodeIDType::String;
+    } else if (value == "Int") {
+      idType_ = opc::OPCNodeIDType::Int;
+    } else if (value == "Path") {
+      idType_ = opc::OPCNodeIDType::Path;
+    } else {
+      // Where have our validators gone?
+      logger_->log_error("%s is not a valid node ID type!", value.c_str());
+    }
+
+    if(idType_ == opc::OPCNodeIDType::Int) {
+      try {
+        int t = std::stoi(nodeID_);
+      } catch(...) {
+        logger_->log_error("%s cannot be used as an int type node ID", 
nodeID_.c_str());
+        return;
+      }
+    }
+    if(idType_ != opc::OPCNodeIDType::Path) {
+      if(!context->getProperty(NameSpaceIndex.getName(), nameSpaceIdx_)) {
+        logger_->log_error("%s is mandatory in case %s is not Path", 
NameSpaceIndex.getName().c_str(), NodeIDType.getName().c_str());
+        return;
+      }
+    }
+
+    configOK_ = true;
+  }
+
+  void FetchOPCProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session){
+    if(!configOK_) {
+      logger_->log_error("This processor was not configured properly, 
yielding. Please check for previous errors in the logs!");
+      yield();
+      return;
+    }
+
+    logger_->log_trace("FetchOPCProcessor::onTrigger");
+
+    std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+    if(!lock.owns_lock()){
+      logger_->log_warn("processor was triggered before previous listing 
finished, configuration should be revised!");
+      return;
+    }
+
+    if (!reconnect()) {
+      yield();
+      return;
+    }
+
+    nodesFound_ = 0;
+    variablesFound_ = 0;
+
+    std::function<opc::nodeFoundCallBackFunc> f = 
std::bind(&FetchOPCProcessor::nodeFoundCallBack, this, std::placeholders::_1, 
std::placeholders::_2, std::placeholders::_3, context, session);
+    if(idType_ != opc::OPCNodeIDType::Path) {
+      UA_NodeId myID;
+      myID.namespaceIndex = nameSpaceIdx_;
+      if(idType_ == opc::OPCNodeIDType::Int) {
+        myID.identifierType = UA_NODEIDTYPE_NUMERIC;
+        myID.identifier.numeric = std::stoi(nodeID_);
+      } else if (idType_ == opc::OPCNodeIDType::String) {
+        myID.identifierType = UA_NODEIDTYPE_STRING;
+        myID.identifier.string = UA_STRING_ALLOC(nodeID_.c_str());
+      }
+      connection_->traverse(myID, f, "", maxDepth_);
+    } else {
+      if(translatedNodeIDs_.empty()) {
+        auto sc = connection_->translateBrowsePathsToNodeIdsRequest(nodeID_, 
translatedNodeIDs_, logger_);
+        if(sc != UA_STATUSCODE_GOOD) {
+          logger_->log_error("Failed to translate %s to node id, no flow files 
will be generated (%s)", nodeID_.c_str(), UA_StatusCode_name(sc));
+          yield();
+          return;
+        }
+      }
+      for(auto& nodeID: translatedNodeIDs_) {
+        connection_->traverse(nodeID, f, nodeID_, maxDepth_);
+      }
+    }
+    if(nodesFound_ == 0) {
+      logger_->log_warn("Connected to OPC server, but no variable nodes were 
not found. Configuration might be incorrect! Yielding...");
+      yield();
+    } else if (variablesFound_ == 0) {
+      logger_->log_warn("Found no variables when traversing the specified 
node. No flowfiles are generated. Yielding...");
+      yield();
+    }
+
+  }
+
+  bool FetchOPCProcessor::nodeFoundCallBack(opc::Client& client, const 
UA_ReferenceDescription *ref, const std::string& path,
+      const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
+    nodesFound_++;
+    if(ref->nodeClass == UA_NODECLASS_VARIABLE)
+    {
+      try {
+        opc::NodeData nodedata = connection_->getNodeData(ref);
+        OPCData2FlowFile(nodedata, context, session);
+        variablesFound_++;
+      } catch (const std::exception& exception) {
+        std::string browsename((char*)ref->browseName.name.data, 
ref->browseName.name.length);
+        logger_->log_warn("Caught Exception while trying to get data from node 
&s: %s", path + "/" + browsename,  exception.what());
+      }
+    }
+    return true;
+  }
+
+  void FetchOPCProcessor::OPCData2FlowFile(const opc::NodeData& opcnode, const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) {
+    std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
+    if (flowFile == nullptr) {
+      logger_->log_error("Failed to create flowfile!");
+      return;
+    }
+    for(const auto& attr: opcnode.attributes) {
+      flowFile->setAttribute(attr.first, attr.second);
+    }
+    if(opcnode.data.size() > 0) {
+      try {
+        FetchOPCProcessor::WriteCallback 
callback(opc::nodeValue2String(opcnode));
+        session->write(flowFile, &callback);
+      } catch (const std::exception& e) {
+        std::string browsename;
+        flowFile->getAttribute("Browsename", browsename);
+        logger_->log_info("Failed to extract data of OPC node %s: %s", 
browsename, e.what());
+        session->transfer(flowFile, Failure);
+        return;
+      }
+    }
+    session->transfer(flowFile, Success);
+  }
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/opc/src/opc.cpp b/extensions/opc/src/opc.cpp
new file mode 100644
index 0000000..841fd55
--- /dev/null
+++ b/extensions/opc/src/opc.cpp
@@ -0,0 +1,567 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//OPC includes
+#include "opc.h"
+
+//MiNiFi includes
+#include "utils/ScopeGuard.h"
+#include "utils/StringUtils.h"
+#include "logging/Logger.h"
+#include "Exception.h"
+
+//Standard includes
+#include <stdlib.h>
+#include <iostream>
+#include <memory>
+#include <vector>
+#include <string>
+#include <functional>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace opc {
+
+/*
+ * The following functions are only used internally in OPC lib, not to be 
exported
+ */
+
+namespace {
+
+  void add_value_to_variant(UA_Variant *variant, std::string &value) {
+    UA_String ua_value = UA_STRING(&value[0]);
+    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_STRING]);
+  }
+
+  void add_value_to_variant(UA_Variant *variant, const char *value) {
+    std::string strvalue(value);
+    add_value_to_variant(variant, strvalue);
+  }
+
+  void add_value_to_variant(UA_Variant *variant, int64_t value) {
+    UA_Int64 ua_value = value;
+    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_INT64]);
+  }
+
+  void add_value_to_variant(UA_Variant *variant, uint64_t value) {
+    UA_UInt64 ua_value = value;
+    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_UINT64]);
+  }
+
+  void add_value_to_variant(UA_Variant *variant, int32_t value) {
+    UA_Int32 ua_value = value;
+    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_INT32]);
+  }
+
+  void add_value_to_variant(UA_Variant *variant, uint32_t value) {
+    UA_UInt32 ua_value = value;
+    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_UINT32]);
+  }
+
+  void add_value_to_variant(UA_Variant *variant, bool value) {
+    UA_Boolean ua_value = value;
+    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_BOOLEAN]);
+  }
+
+  void add_value_to_variant(UA_Variant *variant, float value) {
+    UA_Float ua_value = value;
+    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_FLOAT]);
+  }
+
+  void add_value_to_variant(UA_Variant *variant, double value) {
+    UA_Double ua_value = value;
+    UA_Variant_setScalarCopy(variant, &ua_value, &UA_TYPES[UA_TYPES_DOUBLE]);
+  }
+
+  core::logging::LOG_LEVEL MapOPCLogLevel(UA_LogLevel ualvl) {
+    switch (ualvl) {
+      case UA_LOGLEVEL_TRACE:
+        return core::logging::trace;
+      case UA_LOGLEVEL_DEBUG:
+        return core::logging::debug;
+      case UA_LOGLEVEL_INFO:
+        return core::logging::info;
+      case UA_LOGLEVEL_WARNING:
+        return core::logging::warn;
+      case UA_LOGLEVEL_ERROR:
+        return core::logging::err;
+      case UA_LOGLEVEL_FATAL:
+        return core::logging::critical;
+      default:
+        return core::logging::critical;
+    }
+  }
+}
+
+/*
+ * End of internal functions
+ */
+
+Client::Client(std::shared_ptr<core::logging::Logger> logger, const 
std::string& applicationURI,
+               const std::vector<char>& certBuffer, const std::vector<char>& 
keyBuffer,
+               const std::vector<std::vector<char>>& trustBuffers) {
+
+  client_ = UA_Client_new();
+  if (certBuffer.empty()) {
+    UA_ClientConfig_setDefault(UA_Client_getConfig(client_));
+  } else {
+    UA_ClientConfig *cc = UA_Client_getConfig(client_);
+    cc->securityMode = UA_MESSAGESECURITYMODE_SIGNANDENCRYPT;
+
+    // Certificate
+    UA_ByteString certByteString = UA_STRING_NULL;
+    certByteString.length = certBuffer.size();
+    certByteString.data = (UA_Byte*)UA_malloc(certByteString.length * 
sizeof(UA_Byte));
+    memcpy(certByteString.data, certBuffer.data(), certByteString.length);
+
+    // Key
+    UA_ByteString keyByteString = UA_STRING_NULL;
+    keyByteString.length = keyBuffer.size();
+    keyByteString.data = (UA_Byte*)UA_malloc(keyByteString.length * 
sizeof(UA_Byte));
+    memcpy(keyByteString.data, keyBuffer.data(), keyByteString.length);
+
+    // Trusted certificates
+    UA_STACKARRAY(UA_ByteString, trustList, trustBuffers.size());
+    for (size_t i = 0; i < trustBuffers.size(); i++) {
+      trustList[i] = UA_STRING_NULL;
+      trustList[i].length = trustBuffers[i].size();
+      trustList[i].data = (UA_Byte*)UA_malloc(trustList[i].length * 
sizeof(UA_Byte));
+      memcpy(trustList[i].data, trustBuffers[i].data(), trustList[i].length);
+    }
+    UA_StatusCode sc = UA_ClientConfig_setDefaultEncryption(cc, 
certByteString, keyByteString,
+                                                            trustList, 
trustBuffers.size(),
+                                                            nullptr, 0);
+    UA_ByteString_clear(&certByteString);
+    UA_ByteString_clear(&keyByteString);
+    for (size_t i = 0; i < trustBuffers.size(); i++) {
+      UA_ByteString_clear(&trustList[i]);
+    }
+    if (sc != UA_STATUSCODE_GOOD) {
+      logger->log_error("Configuring the client for encryption failed: %s", 
UA_StatusCode_name(sc));
+      UA_Client_delete(client_);
+      throw OPCException(GENERAL_EXCEPTION, std::string("Failed to created 
client with the provided encryption settings: ") + UA_StatusCode_name(sc));
+    }
+  }
+
+  const UA_Logger MinifiUALogger = {logFunc, logger.get(), logClear};
+
+  UA_ClientConfig *configPtr = UA_Client_getConfig(client_);
+  configPtr->logger = MinifiUALogger;
+
+  if(applicationURI.length() > 0) {
+    UA_String_clear(&configPtr->clientDescription.applicationUri);
+    configPtr->clientDescription.applicationUri = 
UA_STRING_ALLOC(applicationURI.c_str());
+  }
+
+  logger_ = logger;
+}
+
+Client::~Client() {
+  if(client_ == nullptr) {
+    return;
+  }
+  if(UA_Client_getState(client_) != UA_CLIENTSTATE_DISCONNECTED) {
+    auto sc = UA_Client_disconnect(client_);
+    if(sc != UA_STATUSCODE_GOOD) {
+      logger_->log_warn("Failed to disconnect OPC client: %s", 
UA_StatusCode_name(sc));
+    }
+  }
+  UA_Client_delete(client_);
+}
+
+bool Client::isConnected() {
+  if(!client_) {
+    return false;
+  }
+  return UA_Client_getState(client_) != UA_CLIENTSTATE_DISCONNECTED;
+}
+
+UA_StatusCode Client::connect(const std::string& url, const std::string& 
username, const std::string& password) {
+  if (username.empty()) {
+    return UA_Client_connect(client_, url.c_str());
+  } else {
+    return UA_Client_connect_username(client_, url.c_str(), username.c_str(), 
password.c_str());
+  }
+}
+
+NodeData Client::getNodeData(const UA_ReferenceDescription *ref, const 
std::string& basePath) {
+  if(ref->nodeClass == UA_NODECLASS_VARIABLE)
+  {
+    opc::NodeData nodedata;
+    std::string browsename(reinterpret_cast<const 
char*>(ref->browseName.name.data), ref->browseName.name.length);
+
+    if(ref->nodeId.nodeId.identifierType == UA_NODEIDTYPE_STRING) {
+      std::string nodeidstr(reinterpret_cast<const 
char*>(ref->nodeId.nodeId.identifier.string.data),
+                            ref->nodeId.nodeId.identifier.string.length);
+      nodedata.attributes["NodeID"] = nodeidstr;
+      nodedata.attributes["NodeID type"] = "string";
+    } else if(ref->nodeId.nodeId.identifierType == UA_NODEIDTYPE_BYTESTRING) {
+      std::string nodeidstr(reinterpret_cast<const 
char*>(ref->nodeId.nodeId.identifier.byteString.data), 
ref->nodeId.nodeId.identifier.byteString.length);
+      nodedata.attributes["NodeID"] = nodeidstr;
+      nodedata.attributes["NodeID type"] = "bytestring";
+    } else if (ref->nodeId.nodeId.identifierType == UA_NODEIDTYPE_NUMERIC) {
+      nodedata.attributes["NodeID"] = 
std::to_string(ref->nodeId.nodeId.identifier.numeric);
+      nodedata.attributes["NodeID type"] = "numeric";
+    }
+    nodedata.attributes["Browsename"] = browsename;
+    nodedata.attributes["Full path"] = basePath + "/" + browsename;
+    nodedata.dataTypeID = UA_TYPES_COUNT;
+    UA_Variant* var = UA_Variant_new();
+    if(UA_Client_readValueAttribute(client_, ref->nodeId.nodeId, var) == 
UA_STATUSCODE_GOOD && var->type != NULL && var->data != NULL) {
+      nodedata.dataTypeID = var->type->typeIndex;
+      nodedata.addVariant(var);
+      if(var->type->typeName) {
+        nodedata.attributes["Typename"] = std::string(var->type->typeName);
+      }
+      if(var->type->memSize) {
+        nodedata.attributes["Datasize"] = std::to_string(var->type->memSize);
+        nodedata.data = std::vector<uint8_t>(var->type->memSize);
+        memcpy(nodedata.data.data(), var->data, var->type->memSize);
+      }
+      return nodedata;
+    }
+    UA_Variant_delete(var);
+    throw OPCException(GENERAL_EXCEPTION, "Failed to read value of node: " + 
browsename);
+  } else {
+    throw OPCException(GENERAL_EXCEPTION, "Only variable nodes are 
supported!");
+  }
+}
+
+UA_ReferenceDescription * Client::getNodeReference(UA_NodeId nodeId) {
+  UA_ReferenceDescription *ref = UA_ReferenceDescription_new();
+  UA_ReferenceDescription_init(ref);
+  UA_NodeId_copy(&nodeId, &ref->nodeId.nodeId);
+  auto sc = UA_Client_readNodeClassAttribute(client_, nodeId, &ref->nodeClass);
+  if (sc == UA_STATUSCODE_GOOD) {
+    sc = UA_Client_readBrowseNameAttribute(client_, nodeId, &ref->browseName);
+  }
+  if (sc == UA_STATUSCODE_GOOD) {
+    UA_Client_readDisplayNameAttribute(client_, nodeId, &ref->displayName);
+  }
+  return ref;
+}
+
+void Client::traverse(UA_NodeId nodeId, std::function<nodeFoundCallBackFunc> 
cb, const std::string& basePath, uint64_t maxDepth, bool fetchRoot) {
+  if (fetchRoot) {
+    UA_ReferenceDescription *rootRef = getNodeReference(nodeId);
+    if ((rootRef->nodeClass == UA_NODECLASS_VARIABLE || rootRef->nodeClass == 
UA_NODECLASS_OBJECT) && rootRef->browseName.name.length > 0) {
+      cb(*this, rootRef, basePath);
+    }
+    UA_ReferenceDescription_delete(rootRef);
+  }
+
+  if(maxDepth != 0) {
+    maxDepth--;
+    if(maxDepth == 0) {
+      return;
+    }
+  }
+  UA_BrowseRequest bReq;
+  UA_BrowseRequest_init(&bReq);
+  bReq.requestedMaxReferencesPerNode = 0;
+  bReq.nodesToBrowse = UA_BrowseDescription_new();
+  bReq.nodesToBrowseSize = 1;
+
+  UA_NodeId_copy(&nodeId, &bReq.nodesToBrowse[0].nodeId);
+  bReq.nodesToBrowse[0].resultMask = UA_BROWSERESULTMASK_ALL;
+
+  UA_BrowseResponse bResp = UA_Client_Service_browse(client_, bReq);
+
+  utils::ScopeGuard guard([&bResp]() {
+    UA_BrowseResponse_deleteMembers(&bResp);
+  });
+
+  UA_BrowseRequest_deleteMembers(&bReq);
+
+  for(size_t i = 0; i < bResp.resultsSize; ++i) {
+    for(size_t j = 0; j < bResp.results[i].referencesSize; ++j) {
+      UA_ReferenceDescription *ref = &(bResp.results[i].references[j]);
+      if (cb(*this, ref, basePath)) {
+        if (ref->nodeClass == UA_NODECLASS_VARIABLE || ref->nodeClass == 
UA_NODECLASS_OBJECT) {
+          std::string browsename((char *) ref->browseName.name.data, 
ref->browseName.name.length);
+          traverse(ref->nodeId.nodeId, cb, basePath + browsename, maxDepth, 
false);
+        }
+      } else {
+        return;
+      }
+    }
+  }
+};
+
+bool Client::exists(UA_NodeId nodeId) {
+  bool retval = false;
+  auto callback = [&retval](Client& client, const UA_ReferenceDescription 
*ref, const std::string& pat) -> bool {
+    retval = true;
+    return false;  // If any node is found, the given node exists, so traverse 
can be stopped
+  };
+  traverse(nodeId, callback, "", 1);
+  return retval;
+};
+
+UA_StatusCode Client::translateBrowsePathsToNodeIdsRequest(const std::string& 
path, std::vector<UA_NodeId>& foundNodeIDs, const 
std::shared_ptr<core::logging::Logger>& logger) {
+  logger->log_trace("Trying to find node id for %s", path.c_str());
+
+  auto tokens = utils::StringUtils::split(path, "/");
+  std::vector<UA_UInt32> ids;
+  for(size_t i = 0; i < tokens.size(); ++i) {
+    UA_UInt32 val = (i ==0 ) ? UA_NS0ID_ORGANIZES : UA_NS0ID_HASCOMPONENT;
+    ids.push_back(val);
+  }
+
+  UA_BrowsePath browsePath;
+  UA_BrowsePath_init(&browsePath);
+  browsePath.startingNode = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
+
+  browsePath.relativePath.elements = 
(UA_RelativePathElement*)UA_Array_new(tokens.size(), 
&UA_TYPES[UA_TYPES_RELATIVEPATHELEMENT]);
+  browsePath.relativePath.elementsSize = tokens.size();
+
+  for(size_t i = 0; i < tokens.size(); ++i) {
+    UA_RelativePathElement *elem = &browsePath.relativePath.elements[i];
+    elem->referenceTypeId = UA_NODEID_NUMERIC(0, ids[i]);
+    elem->targetName = UA_QUALIFIEDNAME_ALLOC(0, tokens[i].c_str());
+  }
+
+  UA_TranslateBrowsePathsToNodeIdsRequest request;
+  UA_TranslateBrowsePathsToNodeIdsRequest_init(&request);
+  request.browsePaths = &browsePath;
+  request.browsePathsSize = 1;
+
+  UA_TranslateBrowsePathsToNodeIdsResponse response = 
UA_Client_Service_translateBrowsePathsToNodeIds(client_, request);
+
+  utils::ScopeGuard guard([&browsePath]() {
+    UA_BrowsePath_deleteMembers(&browsePath);
+  });
+
+  if(response.resultsSize < 1) {
+    logger->log_warn("No node id in response for %s", path.c_str());
+    return UA_STATUSCODE_BADNODATAAVAILABLE;
+  }
+
+  bool foundData = false;
+
+  for(size_t i = 0; i < response.resultsSize; ++i) {
+    UA_BrowsePathResult res = response.results[i];
+    for(size_t j = 0; j < res.targetsSize; ++j) {
+      foundData = true;
+      UA_NodeId resultId;
+      UA_NodeId_copy(&res.targets[j].targetId.nodeId, &resultId);
+      foundNodeIDs.push_back(resultId);
+      std::string 
namespaceUri((char*)res.targets[j].targetId.namespaceUri.data, 
res.targets[j].targetId.namespaceUri.length);
+    }
+  }
+
+  UA_TranslateBrowsePathsToNodeIdsResponse_deleteMembers(&response);
+
+  if(foundData) {
+    logger->log_debug("Found %lu nodes for path %s", foundNodeIDs.size(), 
path.c_str());
+    return UA_STATUSCODE_GOOD;
+  } else {
+    logger->log_warn("No node id found for path %s", path.c_str());
+    return UA_STATUSCODE_BADNODATAAVAILABLE;
+  }
+}
+
+template<typename T>
+UA_StatusCode Client::add_node(const UA_NodeId parentNodeId, const UA_NodeId 
targetNodeId, std::string browseName, T value, OPCNodeDataType dt, UA_NodeId 
*receivedNodeId)
+{
+  UA_VariableAttributes attr = UA_VariableAttributes_default;
+  add_value_to_variant(&attr.value, value);
+  char local[6] = "en-US";
+  attr.displayName = UA_LOCALIZEDTEXT(local, 
const_cast<char*>(browseName.c_str()));
+  UA_StatusCode sc = UA_Client_addVariableNode(client_,
+                                               targetNodeId,
+                                               parentNodeId,
+                                               UA_NODEID_NUMERIC(0, 
OPCNodeDataTypeToTypeID(dt)),
+                                               UA_QUALIFIEDNAME(1, 
const_cast<char*>(browseName.c_str())),
+                                               UA_NODEID_NULL,
+                                               attr, receivedNodeId);
+  UA_Variant_clear(&attr.value);
+  return sc;
+}
+
+template<typename T>
+UA_StatusCode Client::update_node(const UA_NodeId nodeId, T value) {
+  UA_Variant *variant = UA_Variant_new();
+  add_value_to_variant(variant, value);
+  UA_StatusCode sc = UA_Client_writeValueAttribute(client_, nodeId, variant);
+  UA_Variant_delete(variant);
+  return sc;
+};
+
+std::unique_ptr<Client> 
Client::createClient(std::shared_ptr<core::logging::Logger> logger, const 
std::string& applicationURI,
+                                             const std::vector<char>& 
certBuffer, const std::vector<char>& keyBuffer,
+                                             const 
std::vector<std::vector<char>>& trustBuffers) {
+  try {
+    return ClientPtr(new Client(logger, applicationURI, certBuffer, keyBuffer, 
trustBuffers));
+  } catch (const std::exception& exception) {
+    logger->log_error("Failed to create client: %s", exception.what());
+  }
+  return nullptr;
+}
+
+template UA_StatusCode Client::update_node<int64_t>(const UA_NodeId nodeId, 
int64_t value);
+template UA_StatusCode Client::update_node<uint64_t>(const UA_NodeId nodeId, 
uint64_t value);
+template UA_StatusCode Client::update_node<int32_t>(const UA_NodeId nodeId, 
int32_t value);
+template UA_StatusCode Client::update_node<uint32_t>(const UA_NodeId nodeId, 
uint32_t value);
+template UA_StatusCode Client::update_node<float>(const UA_NodeId nodeId, 
float value);
+template UA_StatusCode Client::update_node<double>(const UA_NodeId nodeId, 
double value);
+template UA_StatusCode Client::update_node<bool>(const UA_NodeId nodeId, bool 
value);
+template UA_StatusCode Client::update_node<const char *>(const UA_NodeId 
nodeId, const char * value);
+template UA_StatusCode Client::update_node<std::string>(const UA_NodeId 
nodeId, std::string value);
+
+template UA_StatusCode Client::add_node<int64_t>(const UA_NodeId parentNodeId, 
const UA_NodeId targetNodeId, std::string browseName, int64_t value, 
OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<uint64_t>(const UA_NodeId 
parentNodeId, const UA_NodeId targetNodeId, std::string browseName, uint64_t 
value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<int32_t>(const UA_NodeId parentNodeId, 
const UA_NodeId targetNodeId, std::string browseName, int32_t value, 
OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<uint32_t>(const UA_NodeId 
parentNodeId, const UA_NodeId targetNodeId, std::string browseName, uint32_t 
value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<float>(const UA_NodeId parentNodeId, 
const UA_NodeId targetNodeId, std::string browseName, float value, 
OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<double>(const UA_NodeId parentNodeId, 
const UA_NodeId targetNodeId, std::string browseName, double value, 
OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<bool>(const UA_NodeId parentNodeId, 
const UA_NodeId targetNodeId, std::string browseName, bool value, 
OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<const char *>(const UA_NodeId 
parentNodeId, const UA_NodeId targetNodeId, std::string browseName, const char 
* value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+template UA_StatusCode Client::add_node<std::string>(const UA_NodeId 
parentNodeId, const UA_NodeId targetNodeId, std::string browseName, std::string 
value, OPCNodeDataType dt, UA_NodeId *receivedNodeId);
+
+int32_t OPCNodeDataTypeToTypeID(OPCNodeDataType dt) {
+  switch(dt)
+  {
+    case OPCNodeDataType::Boolean:
+      return UA_NS0ID_BOOLEAN;
+    case OPCNodeDataType::Int32:
+      return UA_NS0ID_INT32;
+    case OPCNodeDataType::UInt32:
+      return UA_NS0ID_UINT32;
+    case OPCNodeDataType::Int64:
+      return UA_NS0ID_INT64;
+    case OPCNodeDataType::UInt64:
+      return UA_NS0ID_UINT64;
+    case OPCNodeDataType::Float:
+      return UA_NS0ID_FLOAT;
+    case OPCNodeDataType::Double:
+      return UA_NS0ID_DOUBLE;
+    case OPCNodeDataType::String:
+      return UA_NS0ID_STRING;
+    default:
+      throw OPCException(GENERAL_EXCEPTION, "Data type is not supported");
+  }
+}
+
+std::string nodeValue2String(const NodeData& nd) {
+  std::string ret_val;
+  switch (nd.dataTypeID) {
+    case UA_TYPES_STRING:
+    case UA_TYPES_LOCALIZEDTEXT:
+    case UA_TYPES_BYTESTRING: {
+      UA_String value = *(UA_String * )(nd.var_->data);
+      ret_val = std::string(reinterpret_cast<const char *>(value.data), 
value.length);
+      break;
+    }
+    case UA_TYPES_BOOLEAN:
+      bool b;
+      memcpy(&b, nd.data.data(), sizeof(bool));
+      ret_val = b ? "True" : "False";
+      break;
+    case UA_TYPES_SBYTE:
+      int8_t i8t;
+      memcpy(&i8t, nd.data.data(), sizeof(i8t));
+      ret_val = std::to_string(i8t);
+      break;
+    case UA_TYPES_BYTE:
+      uint8_t ui8t;
+      memcpy(&ui8t, nd.data.data(), sizeof(ui8t));
+      ret_val = std::to_string(ui8t);
+      break;
+    case UA_TYPES_INT16:
+      int16_t i16t;
+      memcpy(&i16t, nd.data.data(), sizeof(i16t));
+      ret_val = std::to_string(i16t);
+      break;
+    case UA_TYPES_UINT16:
+      uint16_t ui16t;
+      memcpy(&ui16t, nd.data.data(), sizeof(ui16t));
+      ret_val = std::to_string(ui16t);
+      break;
+    case UA_TYPES_INT32:
+      int32_t i32t;
+      memcpy(&i32t, nd.data.data(), sizeof(i32t));
+      ret_val = std::to_string(i32t);
+      break;
+    case UA_TYPES_UINT32:
+      uint32_t ui32t;
+      memcpy(&ui32t, nd.data.data(), sizeof(ui32t));
+      ret_val = std::to_string(ui32t);
+      break;
+    case UA_TYPES_INT64:
+      int64_t i64t;
+      memcpy(&i64t, nd.data.data(), sizeof(i64t));
+      ret_val = std::to_string(i64t);
+      break;
+    case UA_TYPES_UINT64:
+      uint64_t ui64t;
+      memcpy(&ui64t, nd.data.data(), sizeof(ui64t));
+      ret_val = std::to_string(ui64t);
+      break;
+    case UA_TYPES_FLOAT:
+      if(sizeof(float) == 4 && std::numeric_limits<float>::is_iec559){
+        float f;
+        memcpy(&f, nd.data.data(), sizeof(float));
+        ret_val = std::to_string(f);
+      } else {
+        throw OPCException(GENERAL_EXCEPTION, "Float is non-standard on this 
system, OPC data cannot be extracted!");
+      }
+      break;
+    case UA_TYPES_DOUBLE:
+      if(sizeof(double) == 8 && std::numeric_limits<double>::is_iec559){
+        double d;
+        memcpy(&d, nd.data.data(), sizeof(double));
+        ret_val = std::to_string(d);
+      } else {
+        throw OPCException(GENERAL_EXCEPTION, "Double is non-standard on this 
system, OPC data cannot be extracted!");
+      }
+      break;
+    case UA_TYPES_DATETIME: {
+      UA_DateTime dt;
+      memcpy(&dt, nd.data.data(), sizeof(UA_DateTime));
+      ret_val = opc::OPCDateTime2String(dt);
+      break;
+    }
+    default:
+      throw OPCException(GENERAL_EXCEPTION, "Data type is not supported ");
+  }
+  return ret_val;
+}
+
+std::string OPCDateTime2String(UA_DateTime raw_date) {
+  UA_DateTimeStruct dts = UA_DateTime_toStruct(raw_date);
+  std::array<char, 100> charBuf;
+
+  snprintf(charBuf.data(), charBuf.size(), "%02hu-%02hu-%04hu 
%02hu:%02hu:%02hu.%03hu", dts.day, dts.month, dts.year, dts.hour, dts.min, 
dts.sec, dts.milliSec);
+
+  return std::string(charBuf.data(), charBuf.size());
+}
+
+void logFunc(void *context, UA_LogLevel level, UA_LogCategory category, const 
char *msg, va_list args) {
+  char buffer[1024];
+  vsnprintf(buffer, 1024, msg, args);
+  auto loggerPtr = reinterpret_cast<core::logging::BaseLogger*>(context);
+  loggerPtr->log_string(MapOPCLogLevel(level), buffer);
+}
+
+} /* namespace opc */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/opc/src/opcbase.cpp b/extensions/opc/src/opcbase.cpp
new file mode 100644
index 0000000..4fb42b3
--- /dev/null
+++ b/extensions/opc/src/opcbase.cpp
@@ -0,0 +1,162 @@
+/**
+ * BaseOPC class definition
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+
+#include "opc.h"
+#include "opcbase.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+  core::Property BaseOPCProcessor::OPCServerEndPoint(
+      core::PropertyBuilder::createProperty("OPC server endpoint")
+      ->withDescription("Specifies the address, port and relative path of an 
OPC endpoint")
+      ->isRequired(true)->build());
+
+  core::Property BaseOPCProcessor::ApplicationURI(
+      core::PropertyBuilder::createProperty("Application URI")
+          ->withDescription("Application URI of the client in the format 
'urn:unconfigured:application'. "
+                            "Mandatory, if using Secure Channel and must match 
the URI included in the certificate's Subject Alternative Names.")->build());
+
+
+  core::Property BaseOPCProcessor::Username(
+      core::PropertyBuilder::createProperty("Username")
+          ->withDescription("Username to log in with.")->build());
+
+  core::Property BaseOPCProcessor::Password(
+      core::PropertyBuilder::createProperty("Password")
+          ->withDescription("Password to log in with. Providing this requires 
cert and key to be provided as well, credentials are always sent 
encrypted.")->build());
+
+  core::Property BaseOPCProcessor::CertificatePath(
+      core::PropertyBuilder::createProperty("Certificate path")
+          ->withDescription("Path to the DER-encoded cert file")->build());
+
+  core::Property BaseOPCProcessor::KeyPath(
+      core::PropertyBuilder::createProperty("Key path")
+          ->withDescription("Path to the DER-encoded key file")->build());
+
+  core::Property BaseOPCProcessor::TrustedPath(
+      core::PropertyBuilder::createProperty("Trusted server certificate path")
+          ->withDescription("Path to the DER-encoded trusted server 
certificate")->build());
+
+  void BaseOPCProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &factory) {
+    logger_->log_trace("BaseOPCProcessor::onSchedule");
+
+    applicationURI_.clear();
+    certBuffer_.clear();
+    keyBuffer_.clear();
+    password_.clear();
+    username_.clear();
+    trustBuffers_.clear();
+
+    configOK_ = false;
+
+    context->getProperty(OPCServerEndPoint.getName(), endPointURL_);
+    context->getProperty(ApplicationURI.getName(), applicationURI_);
+
+    if (context->getProperty(Username.getName(), username_) != 
context->getProperty(Password.getName(), password_)) {
+      logger_->log_error("Both or neither of Username and Password should be 
provided!");
+      return;
+    }
+
+    auto certificatePathRes = context->getProperty(CertificatePath.getName(), 
certpath_);
+    auto keyPathRes = context->getProperty(KeyPath.getName(), keypath_);
+    auto trustedPathRes = context->getProperty(TrustedPath.getName(), 
trustpath_);
+    if (certificatePathRes != keyPathRes || keyPathRes != trustedPathRes) {
+      logger_->log_error("All or none of Certificate path, Key path and 
Trusted server certificate path should be provided!");
+      return;
+    }
+
+    if (!password_.empty() && (certpath_.empty() || keypath_.empty() || 
trustpath_.empty() || applicationURI_.empty())) {
+      logger_->log_error("Certificate path, Key path, Trusted server 
certificate path and Application URI must be provided in case Password is 
provided!");
+      return;
+    }
+
+    if (!certpath_.empty()) {
+      if (applicationURI_.empty()) {
+        logger_->log_error("Application URI must be provided if Certificate 
path is provided!");
+        return;
+      }
+
+      std::ifstream input_cert(certpath_, std::ios::binary);
+      if (input_cert.good()) {
+        certBuffer_ = 
std::vector<char>(std::istreambuf_iterator<char>(input_cert), {});
+      }
+      std::ifstream input_key(keypath_, std::ios::binary);
+      if (input_key.good()) {
+        keyBuffer_ = 
std::vector<char>(std::istreambuf_iterator<char>(input_key), {});
+      }
+
+      trustBuffers_.emplace_back();
+      std::ifstream input_trust(trustpath_, std::ios::binary);
+      if (input_trust.good()) {
+        trustBuffers_[0] = 
std::vector<char>(std::istreambuf_iterator<char>(input_trust), {});
+      }
+
+      if (certBuffer_.empty()) {
+        logger_->log_error("Failed to load cert from path: %s", certpath_);
+        return;
+      }
+      if (keyBuffer_.empty()) {
+        logger_->log_error("Failed to load key from path: %s", keypath_);
+        return;
+      }
+
+      if (trustBuffers_[0].empty()) {
+        logger_->log_error("Failed to load trusted server certs from path: 
%s", trustpath_);
+        return;
+      }
+    }
+
+    configOK_ = true;
+  }
+
+  bool BaseOPCProcessor::reconnect() {
+    if (connection_ == nullptr) {
+      connection_ = opc::Client::createClient(logger_, applicationURI_, 
certBuffer_, keyBuffer_, trustBuffers_);
+    }
+
+    if (connection_->isConnected()) {
+      return true;
+    }
+
+    auto sc = connection_->connect(endPointURL_, username_, password_);
+    if (sc != UA_STATUSCODE_GOOD) {
+      logger_->log_error("Failed to connect: %s!", UA_StatusCode_name(sc));
+      return false;
+    }
+    logger_->log_debug("Successfully connected.");
+    return true;
+  }
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
new file mode 100644
index 0000000..c5acf3f
--- /dev/null
+++ b/extensions/opc/src/putopc.cpp
@@ -0,0 +1,466 @@
+/**
+ * PutOPC class definition
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+#include <list>
+#include <map>
+#include <mutex>
+#include <thread>
+
+#include "opc.h"
+#include "putopc.h"
+#include "utils/ByteArrayCallback.h"
+#include "FlowFileRecord.h"
+#include "core/Processor.h"
+#include "core/ProcessSession.h"
+#include "core/Core.h"
+#include "core/Property.h"
+#include "core/Resource.h"
+#include "controllers/SSLContextService.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/Id.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+  core::Property PutOPCProcessor::ParentNodeID(
+      core::PropertyBuilder::createProperty("Parent node ID")
+          ->withDescription("Specifies the ID of the root node to traverse")
+          ->isRequired(true)->build());
+
+
+  core::Property PutOPCProcessor::ParentNodeIDType(
+      core::PropertyBuilder::createProperty("Parent node ID type")
+          ->withDescription("Specifies the type of the provided node ID")
+          ->isRequired(true)
+          ->withAllowableValues<std::string>({"Path", "Int", 
"String"})->build());
+
+  core::Property PutOPCProcessor::ParentNameSpaceIndex(
+      core::PropertyBuilder::createProperty("Parent node namespace index")
+          ->withDescription("The index of the namespace. Used only if node ID 
type is not path.")
+          ->withDefaultValue<int32_t>(0)->build());
+
+  core::Property PutOPCProcessor::ValueType(
+      core::PropertyBuilder::createProperty("Value type")
+          ->withDescription("Set the OPC value type of the created nodes")
+          ->isRequired(true)->build());
+
+  core::Property PutOPCProcessor::TargetNodeIDType(
+      core::PropertyBuilder::createProperty("Target node ID type")
+          ->withDescription("ID type of target node. Allowed values are: Int, 
String.")
+          ->supportsExpressionLanguage(true)->build());
+
+  core::Property PutOPCProcessor::TargetNodeID(
+      core::PropertyBuilder::createProperty("Target node ID")
+          ->withDescription("ID of target node.")
+          ->supportsExpressionLanguage(true)->build());
+
+  core::Property PutOPCProcessor::TargetNodeNameSpaceIndex(
+      core::PropertyBuilder::createProperty("Target node namespace index")
+          ->withDescription("The index of the namespace. Used only if node ID 
type is not path.")
+          ->supportsExpressionLanguage(true)->build());
+
+  core::Property PutOPCProcessor::TargetNodeBrowseName(
+      core::PropertyBuilder::createProperty("Target node browse name")
+          ->withDescription("Browse name of target node. Only used when new 
node is created.")
+          ->supportsExpressionLanguage(true)->build());
+
+  static core::Property TargetNodeID;
+  static core::Property TargetNodeBrowseName;
+
+
+  core::Relationship PutOPCProcessor::Success("success", "Successfully put 
OPC-UA node");
+  core::Relationship PutOPCProcessor::Failure("failure", "Failed to put OPC-UA 
node");
+
+  void PutOPCProcessor::initialize() {
+    PutOPCProcessor::ValueType.clearAllowedValues();
+    core::PropertyValue pv;
+    for(const auto& kv : opc::StringToOPCDataTypeMap) {
+      pv = kv.first;
+      PutOPCProcessor::ValueType.addAllowedValue(pv);
+    }
+    std::set<core::Property> putOPCProperties = {ParentNodeID, 
ParentNodeIDType, ParentNameSpaceIndex, ValueType, TargetNodeIDType, 
TargetNodeID, TargetNodeNameSpaceIndex, TargetNodeBrowseName};
+    std::set<core::Property> baseOPCProperties = 
BaseOPCProcessor::getSupportedProperties();
+    putOPCProperties.insert(baseOPCProperties.begin(), 
baseOPCProperties.end());
+    setSupportedProperties(putOPCProperties);
+
+    // Set the supported relationships
+    setSupportedRelationships({Success, Failure});
+  }
+
+  void PutOPCProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+    logger_->log_trace("PutOPCProcessor::onSchedule");
+
+    parentExists_ = false;
+
+    BaseOPCProcessor::onSchedule(context, factory);
+
+    if(!configOK_) {
+      return;
+    }
+
+    configOK_ = false;
+
+    context->getProperty(OPCServerEndPoint.getName(), endPointURL_);
+    std::string value;
+    context->getProperty(ParentNodeID.getName(), nodeID_);
+    context->getProperty(ParentNodeIDType.getName(), value);
+
+    if (value == "String") {
+      idType_ = opc::OPCNodeIDType::String;
+    } else if (value == "Int") {
+      idType_ = opc::OPCNodeIDType::Int;
+    } else if (value == "Path") {
+      idType_ = opc::OPCNodeIDType::Path;
+    } else {
+      // Where have our validators gone?
+      logger_->log_error("%s is not a valid node ID type!", value.c_str());
+    }
+
+    if(idType_ == opc::OPCNodeIDType::Int) {
+      try {
+        int t = std::stoi(nodeID_);
+      } catch(...) {
+        logger_->log_error("%s cannot be used as an int type node ID", 
nodeID_.c_str());
+        return;
+      }
+    }
+    if(idType_ != opc::OPCNodeIDType::Path) {
+      if(!context->getProperty(ParentNameSpaceIndex.getName(), nameSpaceIdx_)) 
{
+        logger_->log_error("%s is mandatory in case %s is not Path", 
ParentNameSpaceIndex.getName().c_str(), ParentNodeIDType.getName().c_str());
+        return;
+      }
+    }
+
+    std::string typestr;
+    context->getProperty(ValueType.getName(), typestr);
+    nodeDataType_ = opc::StringToOPCDataTypeMap.at(typestr);  // This throws, 
but allowed values are generated based on this map -> that's a really 
unexpected error
+
+    configOK_ = true;
+  }
+
+  void PutOPCProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
+    if (!configOK_) {
+      logger_->log_error(
+          "This processor was not configured properly, yielding. Please check 
for previous errors in the logs!");
+      yield();
+      return;
+    }
+
+    logger_->log_trace("PutOPCProcessor::onTrigger");
+
+    std::unique_lock<std::mutex> lock(onTriggerMutex_, std::try_to_lock);
+    if (!lock.owns_lock()) {
+      logger_->log_warn("processor was triggered before previous listing 
finished, configuration should be revised!");
+      return;
+    }
+
+    if (!reconnect()) {
+      yield();
+      return;
+    }
+
+    if (!parentExists_) {
+      if (idType_ == opc::OPCNodeIDType::Path) {
+        std::vector<UA_NodeId> translatedNodeIDs;
+        if (connection_->translateBrowsePathsToNodeIdsRequest(nodeID_, 
translatedNodeIDs, logger_) !=
+            UA_STATUSCODE_GOOD) {
+          logger_->log_error("Failed to translate %s to node id, no flow files 
will be put", nodeID_.c_str());
+          yield();
+          return;
+        } else if (translatedNodeIDs.size() != 1) {
+          logger_->log_error("%s was translated to multiple node ids, no flow 
files will be put", nodeID_.c_str());
+          yield();
+          return;
+        } else {
+          parentNodeID_ = translatedNodeIDs[0];
+          parentExists_ = true;
+        }
+      } else {
+        parentNodeID_.namespaceIndex = nameSpaceIdx_;
+        if (idType_ == opc::OPCNodeIDType::Int) {
+          parentNodeID_.identifierType = UA_NODEIDTYPE_NUMERIC;
+          parentNodeID_.identifier.numeric = std::stoi(nodeID_);
+        } else if (idType_ == opc::OPCNodeIDType::String) {
+          parentNodeID_.identifierType = UA_NODEIDTYPE_STRING;
+          parentNodeID_.identifier.string = UA_STRING_ALLOC(nodeID_.c_str());
+        }
+        if (!connection_->exists(parentNodeID_)) {
+          logger_->log_error("Parent node doesn't exist, no flow files will be 
put");
+          yield();
+          return;
+        }
+        parentExists_ = true;
+      }
+    }
+
+    std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->get());
+
+    // Do nothing if there are no incoming files
+    if (!flowFile) {
+      return;
+    }
+
+    std::string targetidtype;
+
+    bool targetNodeExists = false;
+    bool targetNodeValid = false;
+    UA_NodeId targetnode;
+
+    if (context->getProperty(TargetNodeIDType, targetidtype, flowFile)) {
+      std::string targetid;
+      std::string namespaceidx;
+
+
+      if (!context->getProperty(TargetNodeID, targetid, flowFile)) {
+        logger_->log_error("Flowfile %s had target node ID type specified (%s) 
without ID, routing to failure!",
+                           flowFile->getUUIDStr(), targetidtype);
+        session->transfer(flowFile, Failure);
+        return;
+      }
+
+      if (!context->getProperty(TargetNodeNameSpaceIndex, namespaceidx, 
flowFile)) {
+        logger_->log_error(
+            "Flowfile %s had target node ID type specified (%s) without 
namespace index, routing to failure!",
+            flowFile->getUUIDStr(), targetidtype);
+        session->transfer(flowFile, Failure);
+        return;
+      }
+      int32_t nsi;
+      try {
+        nsi = std::stoi(namespaceidx);
+      } catch (...) {
+        logger_->log_error("Flowfile %s has invalid namespace index (%s), 
routing to failure!",
+                           flowFile->getUUIDStr(), namespaceidx);
+        session->transfer(flowFile, Failure);
+        return;
+      }
+
+      targetnode.namespaceIndex = nsi;
+      if (targetidtype == "Int") {
+        targetnode.identifierType = UA_NODEIDTYPE_NUMERIC;
+        try {
+          targetnode.identifier.numeric = std::stoi(targetid);
+          targetNodeValid = true;
+        } catch (...) {
+          logger_->log_error("Flowfile %s: target node ID is not a valid 
integer: %s. Routing to failure!",
+                             flowFile->getUUIDStr(), targetid);
+          session->transfer(flowFile, Failure);
+          return;
+        }
+      } else if (targetidtype == "String") {
+        targetnode.identifierType = UA_NODEIDTYPE_STRING;
+        targetnode.identifier.string = UA_STRING_ALLOC(targetid.c_str());
+        targetNodeValid = true;
+      } else {
+        logger_->log_error("Flowfile %s: target node ID type is invalid: %s. 
Routing to failure!",
+                           flowFile->getUUIDStr(), targetidtype);
+        session->transfer(flowFile, Failure);
+        return;
+      }
+      targetNodeExists = connection_->exists(targetnode);
+    }
+
+    ReadCallback cb(logger_);
+    session->read(flowFile, &cb);
+
+    const auto &vec = cb.getContent();
+
+    std::string contentstr(reinterpret_cast<const char *>(vec.data()), 
vec.size());
+
+    if (targetNodeExists) {
+      logger_->log_trace("Node exists, trying to update it");
+      try {
+        UA_StatusCode sc;
+        switch (nodeDataType_) {
+          case opc::OPCNodeDataType::Int64: {
+            int64_t value = std::stoll(contentstr);
+            sc = connection_->update_node(targetnode, value);
+            break;
+          }
+          case opc::OPCNodeDataType::UInt64: {
+            uint64_t value = std::stoull(contentstr);
+            sc = connection_->update_node(targetnode, value);
+            break;
+          }
+          case opc::OPCNodeDataType::Int32: {
+            int32_t value = std::stoi(contentstr);
+            sc = connection_->update_node(targetnode, value);
+            break;
+          }
+          case opc::OPCNodeDataType::UInt32: {
+            uint32_t value = std::stoul(contentstr);
+            sc = connection_->update_node(targetnode, value);
+            break;
+          }
+          case opc::OPCNodeDataType::Boolean: {
+            bool value;
+            if (utils::StringUtils::StringToBool(contentstr, value)) {
+              sc = connection_->update_node(targetnode, value);
+            } else {
+              throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be 
converted to bool");
+            }
+            break;
+          }
+          case opc::OPCNodeDataType::Float: {
+            float value = std::stof(contentstr);
+            sc = connection_->update_node(targetnode, value);
+            break;
+          }
+          case opc::OPCNodeDataType::Double: {
+            double value = std::stod(contentstr);
+            sc = connection_->update_node(targetnode, value);
+            break;
+          }
+          case opc::OPCNodeDataType::String: {
+            sc = connection_->update_node(targetnode, contentstr);
+            break;
+          }
+          default:
+            throw opc::OPCException(GENERAL_EXCEPTION, "This should never 
happen!");
+        }
+        if (sc != UA_STATUSCODE_GOOD) {
+          logger_->log_error("Failed to update node: %s", 
UA_StatusCode_name(sc));
+          session->transfer(flowFile, Failure);
+          return;
+        }
+      } catch (...) {
+        std::string typestr;
+        context->getProperty(ValueType.getName(), typestr);
+        logger_->log_error("Failed to convert %s to data type %s", contentstr, 
typestr);
+        session->transfer(flowFile, Failure);
+        return;
+      }
+      logger_->log_trace("Node successfully updated!");
+      session->transfer(flowFile, Success);
+      return;
+    } else {
+      logger_->log_trace("Node doesn't exist, trying to create new node");
+      std::string browsename;
+      if (!context->getProperty(TargetNodeBrowseName, browsename, flowFile)) {
+        logger_->log_error("Target node browse name is required for flowfile 
(%s) as new node is to be created",
+                           flowFile->getUUIDStr());
+        session->transfer(flowFile, Failure);
+        return;
+      }
+      if (!targetNodeValid) {
+        targetnode = UA_NODEID_NUMERIC(1, 0);
+      }
+      try {
+        UA_StatusCode sc;
+        UA_NodeId resultnode;
+        switch (nodeDataType_) {
+          case opc::OPCNodeDataType::Int64: {
+            int64_t value = std::stoll(contentstr);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, 
value, nodeDataType_, &resultnode);
+            break;
+          }
+          case opc::OPCNodeDataType::UInt64: {
+            uint64_t value = std::stoull(contentstr);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, 
value, nodeDataType_, &resultnode);
+            break;
+          }
+          case opc::OPCNodeDataType::Int32: {
+            int32_t value = std::stoi(contentstr);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, 
value, nodeDataType_, &resultnode);
+            break;
+          }
+          case opc::OPCNodeDataType::UInt32: {
+            uint32_t value = std::stoul(contentstr);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, 
value, nodeDataType_, &resultnode);
+            break;
+          }
+          case opc::OPCNodeDataType::Boolean: {
+            bool value;
+            if (utils::StringUtils::StringToBool(contentstr, value)) {
+              sc = connection_->add_node(parentNodeID_, targetnode, 
browsename, value, nodeDataType_, &resultnode);
+            } else {
+              throw opc::OPCException(GENERAL_EXCEPTION, "Content cannot be 
converted to bool");
+            }
+            break;
+          }
+          case opc::OPCNodeDataType::Float: {
+            float value = std::stof(contentstr);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, 
value, nodeDataType_, &resultnode);
+            break;
+          }
+          case opc::OPCNodeDataType::Double: {
+            double value = std::stod(contentstr);
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, 
value, nodeDataType_, &resultnode);
+            break;
+          }
+          case opc::OPCNodeDataType::String: {
+            sc = connection_->add_node(parentNodeID_, targetnode, browsename, 
contentstr, nodeDataType_, &resultnode);
+            break;
+          }
+          default:
+            throw opc::OPCException(GENERAL_EXCEPTION, "This should never 
happen!");
+        }
+        if (sc != UA_STATUSCODE_GOOD) {
+          logger_->log_error("Failed to create node: %s", 
UA_StatusCode_name(sc));
+          session->transfer(flowFile, Failure);
+          return;
+        }
+      } catch (...) {
+        std::string typestr;
+        context->getProperty(ValueType.getName(), typestr);
+        logger_->log_error("Failed to convert %s to data type %s", contentstr, 
typestr);
+        session->transfer(flowFile, Failure);
+        return;
+      }
+      logger_->log_trace("Node successfully created!");
+      session->transfer(flowFile, Success);
+      return;
+    }
+  }
+
+  int64_t 
PutOPCProcessor::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+    buf_.clear();
+    buf_.resize(stream->getSize());
+
+    uint64_t size = 0;
+
+    do {
+      int read = stream->read(buf_.data() + size, 1024);
+
+      if (read < 0) {
+        return -1;
+      }
+
+      if (read == 0) {
+        break;
+      }
+      size += read;
+    } while (size < stream->getSize());
+
+    logger_->log_trace("Read %llu bytes from flowfile content to buffer", 
stream->getSize());
+
+    return size;
+  }
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/include/core/Property.h 
b/libminifi/include/core/Property.h
index 2553c09..8921e59 100644
--- a/libminifi/include/core/Property.h
+++ b/libminifi/include/core/Property.h
@@ -174,6 +174,11 @@ class Property {
   void addAllowedValue(const PropertyValue &value) {
     allowed_values_.push_back(value);
   }
+
+  void clearAllowedValues() {
+    allowed_values_.clear();
+  }
+
   /**
    * Add value to the collection of values.
    */
diff --git a/rheldistro.sh b/rheldistro.sh
index ec1396f..1c71e92 100644
--- a/rheldistro.sh
+++ b/rheldistro.sh
@@ -101,7 +101,7 @@ build_deps(){
             install_bison
           elif [ "$FOUND_VALUE" = "flex" ]; then
             INSTALLED+=("flex")
-         elif [ "$FOUND_VALUE" = "automake" ]; then
+          elif [ "$FOUND_VALUE" = "automake" ]; then
             INSTALLED+=("automake")
           elif [ "$FOUND_VALUE" = "autoconf" ]; then
             INSTALLED+=("autoconf")
diff --git a/suse.sh b/suse.sh
index dceb710..7aff26f 100644
--- a/suse.sh
+++ b/suse.sh
@@ -92,7 +92,7 @@ build_deps(){
             install_bison
           elif [ "$FOUND_VALUE" = "flex" ]; then
             INSTALLED+=("flex")
-         elif [ "$FOUND_VALUE" = "automake" ]; then
+          elif [ "$FOUND_VALUE" = "automake" ]; then
             INSTALLED+=("automake")
           elif [ "$FOUND_VALUE" = "autoconf" ]; then
             INSTALLED+=("autoconf")
diff --git a/thirdparty/open62541/open62541.patch 
b/thirdparty/open62541/open62541.patch
new file mode 100644
index 0000000..b6cfaf4
--- /dev/null
+++ b/thirdparty/open62541/open62541.patch
@@ -0,0 +1,42 @@
+diff --git a/CMakeLists.txt b/CMakeLists.txt
+index d426e1da..5f1a4044 100644
+--- a/CMakeLists.txt
++++ b/CMakeLists.txt
+@@ -7,7 +7,7 @@ endif()
+ 
+ string(TOLOWER "${CMAKE_BUILD_TYPE}" BUILD_TYPE_LOWER_CASE)
+ 
+-set(CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/tools/cmake")
++list(APPEND CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/tools/cmake")
+ find_package(PythonInterp REQUIRED)
+ find_package(Git)
+ include(AssignSourceGroup)
+@@ -416,17 +416,17 @@ if(NOT UA_COMPILE_AS_CXX AND (CMAKE_COMPILER_IS_GNUCC OR 
"x${CMAKE_C_COMPILER_ID
+ 
+         # IPO requires too much memory for unit tests
+         # GCC docu recommends to compile all files with the same options, 
therefore ignore it completely
+-        if(NOT UA_BUILD_UNIT_TESTS)
+-            # needed to check if IPO is supported (check needs cmake > 3.9)
+-            if("${CMAKE_VERSION}" VERSION_GREATER 3.9)
+-                cmake_policy(SET CMP0069 NEW) # needed as long as required 
cmake < 3.9
+-                include(CheckIPOSupported)
+-                check_ipo_supported(RESULT CC_HAS_IPO) # Inter Procedural 
Optimization / Link Time Optimization (should be same as -flto)
+-                if(CC_HAS_IPO)
+-                    set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ON)
+-                endif()
+-            endif()
+-        endif()
++        #if(NOT UA_BUILD_UNIT_TESTS)
++        #    # needed to check if IPO is supported (check needs cmake > 3.9)
++        #    if("${CMAKE_VERSION}" VERSION_GREATER 3.9)
++        #        cmake_policy(SET CMP0069 NEW) # needed as long as required 
cmake < 3.9
++        #        include(CheckIPOSupported)
++        #        check_ipo_supported(RESULT CC_HAS_IPO) # Inter Procedural 
Optimization / Link Time Optimization (should be same as -flto)
++        #        if(CC_HAS_IPO)
++        #            set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ON)
++        #        endif()
++        #    endif()
++        #endif()
+     endif()
+ 
+     if(UA_ENABLE_AMALGAMATION)

Reply via email to