This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 490f45f  MINIFICPP-1526 - Add ConsumeJournald to consume systemd 
journal messages. Use libsystemd through dlopen(), which should be fine both 
with LGPL and Apache License.
490f45f is described below

commit 490f45fbe39d729a9cd77dfd8f162166296bf84c
Author: Marton Szasz <[email protected]>
AuthorDate: Fri May 21 14:26:11 2021 +0200

    MINIFICPP-1526 - Add ConsumeJournald to consume systemd journal messages.
    Use libsystemd through dlopen(), which should be fine both with LGPL
    and Apache License.
    
    Signed-off-by: Adam Debreceni <[email protected]>
    
    This closes #1044
---
 .gitignore                                         |   6 +
 CMakeLists.txt                                     |  14 +-
 CPPLINT.cfg                                        |   2 +-
 PROCESSORS.md                                      |  25 ++
 README.md                                          |  23 +-
 bootstrap.sh                                       |  17 +-
 bstrp_functions.sh                                 |  18 +-
 cmake/Date.cmake                                   |   5 +-
 cmake/Extensions.cmake                             |   1 +
 controller/CMakeLists.txt                          |   6 +-
 extensions/ExtensionHeader.txt                     |   2 +-
 .../CMakeLists.txt}                                |  14 +-
 .../utils/gsl.h => extensions/systemd/Common.h     |  16 +-
 extensions/systemd/ConsumeJournald.cpp             | 270 +++++++++++++++++++
 extensions/systemd/ConsumeJournald.h               | 114 ++++++++
 .../gsl.h => extensions/systemd/WorkerThread.cpp   |  29 +-
 extensions/systemd/WorkerThread.h                  |  72 +++++
 extensions/systemd/libwrapper/DlopenWrapper.cpp    | 110 ++++++++
 .../systemd/libwrapper/DlopenWrapper.h             |  20 +-
 .../systemd/libwrapper/LibWrapper.cpp              |  20 +-
 extensions/systemd/libwrapper/LibWrapper.h         |  57 ++++
 extensions/systemd/tests/CMakeLists.txt            |  33 +++
 extensions/systemd/tests/ConsumeJournaldTest.cpp   | 296 +++++++++++++++++++++
 libminifi/CMakeLists.txt                           |   9 +-
 libminifi/include/Exception.h                      |  27 +-
 libminifi/include/core/ConfigurableComponent.h     |   4 +-
 libminifi/include/core/CoreComponentState.h        |  11 +
 libminifi/include/core/FlowFile.h                  |   7 +-
 libminifi/include/core/ProcessContext.h            |  16 +-
 libminifi/include/core/ProcessSession.h            |   3 +
 libminifi/include/core/ProcessorNode.h             |   2 +-
 libminifi/include/core/TypedValues.h               |   2 +-
 libminifi/include/utils/FlatMap.h                  |  11 +
 libminifi/include/utils/GeneralUtils.h             |   9 +
 libminifi/include/utils/OptionalUtils.h            |  52 +++-
 libminifi/include/utils/gsl.h                      |  22 ++
 libminifi/src/core/ConfigurableComponent.cpp       |   4 +-
 libminifi/src/core/ProcessSession.cpp              |  12 +
 libminifi/test/TestBase.h                          |   2 +-
 libminifi/test/unit/GeneralUtilsTest.cpp           |   7 +
 .../{include/utils/gsl.h => test/unit/GslTest.cpp} |  28 +-
 libminifi/test/unit/OptionalTest.cpp               |  15 ++
 main/CMakeLists.txt                                |   7 +-
 nanofi/CMakeLists.txt                              |   6 +-
 nanofi/ecu/CMakeLists.txt                          |   2 +-
 nanofi/examples/CMakeLists.txt                     |   6 +-
 46 files changed, 1296 insertions(+), 138 deletions(-)

diff --git a/.gitignore b/.gitignore
index cc40bfd..5926a60 100644
--- a/.gitignore
+++ b/.gitignore
@@ -47,6 +47,7 @@ cmake-build-debug
 # Generated files
 *flowfile_checkpoint
 build
+/*build*
 bt_state
 bin
 target
@@ -58,6 +59,11 @@ docs/generated
 thirdparty/apache-rat/apache-rat*
 /compile_commands.json
 __pycache__/
+/corecomponentstate
+/flowfile_repository
+/content_repository
+/provenance_repository
+/logs
 
 # Ignore source files that have been placed in the docker directory during 
build
 docker/minificppsource
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 136796d..f9cef3e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -19,12 +19,8 @@
 
 cmake_minimum_required(VERSION 3.11)
 cmake_policy(SET CMP0065 OLD) # default export policy, required for self-dlopen
-
-project(nifi-minifi-cpp)
+project(nifi-minifi-cpp VERSION 0.9.0)
 set(PROJECT_NAME "nifi-minifi-cpp")
-set(PROJECT_VERSION_MAJOR 0)
-set(PROJECT_VERSION_MINOR 9)
-set(PROJECT_VERSION_PATCH 0)
 
 # Optional build number for linux distribution targets' tar.gz output
 set(BUILD_NUMBER "" CACHE STRING "Build number")
@@ -572,6 +568,14 @@ if (ENABLE_ALL OR ENABLE_AZURE)
        createExtension(AZURE-EXTENSIONS "AZURE EXTENSIONS" "This enables Azure 
support" "extensions/azure" "${TEST_DIR}/azure-tests")
 endif()
 
+## Add the systemd extension
+if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
+       option(ENABLE_SYSTEMD "Disables the systemd extension." ON)
+       if (ENABLE_SYSTEMD)
+               createExtension(SYSTEMD-EXTENSIONS "SYSTEMD EXTENSIONS" 
"Enabled log collection from journald" "extensions/systemd" 
"extensions/systemd/tests")
+       endif()
+endif()
+
 ## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN
 add_subdirectory(main)
 
diff --git a/CPPLINT.cfg b/CPPLINT.cfg
index 110ae8f..a89734e 100644
--- a/CPPLINT.cfg
+++ b/CPPLINT.cfg
@@ -1,2 +1,2 @@
 set noparent
-filter=-runtime/reference,-runtime/string,-build/c++11,-build/include_subdir
+filter=-runtime/reference,-runtime/string,-build/c++11,-build/include_subdir,-whitespace/forcolon
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 4820510..afe08c8 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -9,6 +9,7 @@
 - [CapturePacket](#capturepacket)
 - [CaptureRTSPFrame](#capturertspframe)
 - [CompressContent](#compresscontent)
+- [ConsumeJournald](#consumejournald)
 - [ConsumeKafka](#consumekafka)
 - [ConsumeMQTT](#consumemqtt)
 - [DeleteS3Object](#deletes3object)
@@ -185,6 +186,30 @@ In the list below, the names of required properties appear 
in bold. Any other pr
 |failure|FlowFiles will be transferred to the failure relationship if they 
fail to compress/decompress|
 |success|FlowFiles will be transferred to the success relationship after 
successfully being compressed or decompressed|
 
+
+## ConsumeJournald
+### Description
+Consume systemd-journald journal messages. Available on Linux only.
+
+### Properties
+All properties are required with a default value, making them effectively 
optional. None of the properties support the NiFi Expression Language.
+
+|         Name         |  Default Value  |                                  
Allowable Values                                  |                             
                                          Description                           
                                             |
+| -------------------- | --------------- | 
----------------------------------------------------------------------------------
 | 
--------------------------------------------------------------------------------------------------------------------------------------------------------
 |
+|      Batch Size      |      1000       |                                  
Positive numbers                                  | The maximum number of 
entries processed in a single execution.                                        
                                                   |
+|    Payload Format    |     Syslog      |                                   
Raw<br>Syslog                                    | Configures flow file content 
formatting.<br>Raw: only the message.<br>Syslog: similar to syslog or 
journalctl output.                                    |
+|  Include Timestamp   |      true       |                                   
true<br>false                                    | Include message timestamp in 
the 'timestamp' attribute.                                                      
                                            |
+|     Journal Type     |     System      |                               
User<br>System<br>Both                               | Type of journal to 
consume.                                                                        
                                                      |
+| Process Old Messages |      false      |                                   
true<br>false                                    | Process events created 
before the first usage (schedule) of the processor instance.                    
                                                  |
+|   Timestamp Format   |    %x %X %Z     | [date 
format](https://howardhinnant.github.io/date/date.html#to_stream_formatting) | 
Format string to use when creating the timestamp attribute or writing messages 
in the syslog format. ISO/ISO 8601/ISO8601 are equivalent to "%FT%T%Ez".  |
+
+### Relationships
+
+|  Name   |          Description           |
+| ------- | ------------------------------ |
+| success | Journal messages as flow files |
+
+
 ## ConsumeKafka
 
 ### Description
diff --git a/README.md b/README.md
index b1c97be..f158d8f 100644
--- a/README.md
+++ b/README.md
@@ -92,6 +92,7 @@ Through JNI extensions you can run NiFi processors using 
NARs. The JNI extension
 | Sensors | GetEnvironmentalSensors<br/>GetMovementSensors | 
-DENABLE_SENSORS=ON |
 | SFTP | 
[FetchSFTP](PROCESSORS.md#fetchsftp)<br/>[ListSFTP](PROCESSORS.md#listsftp)<br/>[PutSFTP](PROCESSORS.md#putsftp)
 | -DENABLE_SFTP=ON |
 | SQL | 
[ExecuteSQL](PROCESSORS.md#executesql)<br/>[PutSQL](PROCESSORS.md#putsql)<br/>[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)<br/>
 | -DENABLE_SQL=ON  |
+| Systemd | [ConsumeJournald](PROCESSORS.md#consumejournald) | 
-DENABLE_SYSTEMD=ON |
 | Tensorflow | 
TFApplyGraph<br/>TFConvertImageToTensor<br/>TFExtractTopLabels<br/>      |    
-DENABLE_TENSORFLOW=ON  |
 | USB Camera | [GetUSBCamera](PROCESSORS.md#getusbcamera)     |    
-DENABLE_USB_CAMERA=ON  |
 | Windows Event Log (Windows only) | 
CollectorInitiatedSubscription<br/>ConsumeWindowsEventLog<br/>TailEventLog | 
-DENABLE_WEL=ON |
@@ -345,8 +346,8 @@ $ # It is recommended that you install bison from source as 
HomeBrew now uses an
      Select MiNiFi C++ Features to toggle.
     ****************************************
     A. Persistent Repositories .....Enabled
-    B. Lib Curl Features ...........Enabled
-    C. Lib Archive Features ........Enabled
+    B. libcurl features  ...........Enabled
+    C. libarchive features  ........Enabled
     D. Execute Script support ......Enabled
     E. Expression Language support .Enabled
     F. Kafka support ...............Disabled
@@ -356,16 +357,16 @@ $ # It is recommended that you install bison from source 
as HomeBrew now uses an
     J. TensorFlow Support ..........Disabled
     K. Bustache Support ............Disabled
     L. MQTT Support ................Disabled
-    M. SQLite Support ..............Disabled
-    N. Python Support ..............Disabled
-    O. COAP Support ................Enabled
-    S. SFTP Support ................Disabled
-    V. AWS Support .................Disabled
+    M. Python Support ..............Disabled
+    N. COAP Support ................Enabled
+    O. SFTP Support ................Disabled
+    S. AWS Support .................Disabled
     T. OpenCV Support ..............Disabled
     U. OPC-UA Support ..............Disabled
-    W. SQL Support .................Disabled
-    X. Openwsman Support ...........Disabled
-    Y. Azure Support ...............Disabled
+    V. SQL Support .................Disabled
+    W. Openwsman Support ...........Disabled
+    X. Azure Support ...............Disabled
+    Y. Systemd Support .............Enabled
     ****************************************
                 Build Options.
     ****************************************
@@ -382,7 +383,7 @@ $ # It is recommended that you install bison from source as 
HomeBrew now uses an
       version of cmake or other software, or
       incompatibility with other extensions
 
-    Enter choice [ A - X or 1-7 ]
+    Enter choice [ A - Y or 1-7 ]
   ```
 
 - Boostrap now saves state between runs. State will automatically be saved. 
Provide -c or --clear to clear this state. The -i option provides a guided menu 
install with the ability to change
diff --git a/bootstrap.sh b/bootstrap.sh
index 34fa8d1..d92d6f1 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -324,6 +324,8 @@ add_dependency OPC_ENABLED "mbedtls"
 
 add_disabled_option AZURE_ENABLED ${FALSE} "ENABLE_AZURE"
 
+add_disabled_option SYSTEMD_ENABLED ${TRUE} "ENABLE_SYSTEMD"
+
 USE_SHARED_LIBS=${TRUE}
 ASAN_ENABLED=${FALSE}
 FAIL_ON_WARNINGS=${FALSE}
@@ -352,7 +354,8 @@ if [ "${OVERRIDE_BUILD_IDENTIFIER}" != 
"${BUILD_IDENTIFIER}" ]; then
 fi
 
 if [ "$BUILD_DIR_D" != "build" ] && [ "$BUILD_DIR_D" != "$BUILD_DIR" ]; then
-  read -r -p "Build dir will override stored state, $BUILD_DIR. Press any key 
to continue " overwrite
+  echo -n "Build dir will override stored state, $BUILD_DIR. Press any key to 
continue "
+  read -r overwrite
   BUILD_DIR=$BUILD_DIR_D
 
 fi
@@ -364,7 +367,8 @@ else
   overwrite="Y"
   if [ "$NO_PROMPT" = "false" ] && [ "$FEATURES_SELECTED" = "false" ]; then
     echo "CMAKE Build dir (${BUILD_DIR}) exists, should we overwrite your 
build directory before we begin?"
-    read -r -p "If you have already bootstrapped, bootstrapping again isn't 
necessary to run make [ Y/N ] " overwrite
+    echo -n "If you have already bootstrapped, bootstrapping again isn't 
necessary to run make [ Y/N ] "
+    read -r overwrite
   fi
   if [ "$overwrite" = "N" ] || [ "$overwrite" = "n" ]; then
     echo "Exiting ...."
@@ -375,8 +379,6 @@ else
 fi
 
 ## change to the directory
-
-
 pushd "${BUILD_DIR}" || exit 1
 
 while [ ! "$FEATURES_SELECTED" == "true" ]
@@ -392,15 +394,13 @@ do
     read_feature_options
   fi
 done
-### ensure we have all dependencies
 
+### ensure we have all dependencies
 save_state
-
 build_deps
 
 
 ## just in case
-
 CMAKE_VERSION=$(${CMAKE_COMMAND} --version | head -n 1 | awk '{print $3}')
 
 CMAKE_MAJOR=$(echo "$CMAKE_VERSION" | cut -d. -f1)
@@ -506,7 +506,8 @@ build_cmake_command(){
 
   continue_with_plan="Y"
   if [ ! "$NO_PROMPT" = "true" ]; then
-    read -r -p "Command will be '${CMAKE_BUILD_COMMAND}', run this? [ Y/N ] " 
continue_with_plan
+    echo -n "Command will be '${CMAKE_BUILD_COMMAND}', run this? [ Y/N ] "
+    read -r continue_with_plan
   fi
   if [ "$continue_with_plan" = "N" ] || [ "$continue_with_plan" = "n" ]; then
     echo "Exiting ...."
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index d62e248..99e98dd 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -139,7 +139,8 @@ EnableAllFeatures(){
 }
 
 pause(){
-  read -r -p "Press [Enter] key to continue..."
+  echo -n "Press [Enter] key to continue..."
+  read -r _
 }
 
 
@@ -301,7 +302,8 @@ show_main_menu() {
 
 read_main_menu_options(){
   local choice
-  read -r -p "Enter choice [ A-C ] " choice
+  echo -n "Enter choice [ A-C ] "
+  read -r choice
   choice=$(echo "${choice}" | tr '[:upper:]' '[:lower:]')
   case $choice in
     a) MENU="features" ;;
@@ -325,7 +327,8 @@ show_advanced_features_menu() {
 
 read_advanced_menu_options(){
   local choice
-  read -r -p "Enter choice [ A-C ] " choice
+  echo -n "Enter choice [ A-C ] "
+  read -r choice
   choice=$(echo "${choice}" | tr '[:upper:]' '[:lower:]')
   case $choice in
     a) ToggleFeature PORTABLE_BUILD ;;
@@ -344,8 +347,8 @@ show_supported_features() {
   echo " Select MiNiFi C++ Features to toggle."
   echo "****************************************"
   echo "A. Persistent Repositories .....$(print_feature_status 
ROCKSDB_ENABLED)"
-  echo "B. Lib Curl Features ...........$(print_feature_status 
HTTP_CURL_ENABLED)"
-  echo "C. Lib Archive Features ........$(print_feature_status 
LIBARCHIVE_ENABLED)"
+  echo "B. libcurl features ............$(print_feature_status 
HTTP_CURL_ENABLED)"
+  echo "C. libarchive features .........$(print_feature_status 
LIBARCHIVE_ENABLED)"
   echo "D. Execute Script support ......$(print_feature_status 
EXECUTE_SCRIPT_ENABLED)"
   echo "E. Expression Language support .$(print_feature_status 
EXPRESSION_LANGAUGE_ENABLED)"
   echo "F. Kafka support ...............$(print_feature_status KAFKA_ENABLED)"
@@ -364,6 +367,7 @@ show_supported_features() {
   echo "V. SQL Support..................$(print_feature_status SQL_ENABLED)"
   echo "W. Openwsman Support ...........$(print_feature_status 
OPENWSMAN_ENABLED)"
   echo "X. Azure Support ...............$(print_feature_status AZURE_ENABLED)"
+  echo "Y. Systemd Support .............$(print_feature_status 
SYSTEMD_ENABLED)"
   echo "****************************************"
   echo "            Build Options."
   echo "****************************************"
@@ -386,7 +390,8 @@ show_supported_features() {
 
 read_feature_options(){
   local choice
-  read -r -p "Enter choice [ A - X or 1-7 ] " choice
+  echo -n "Enter choice [ A - Y or 1-7 ] "
+  read -r choice
   choice=$(echo "${choice}" | tr '[:upper:]' '[:lower:]')
   case $choice in
     a) ToggleFeature ROCKSDB_ENABLED ;;
@@ -415,6 +420,7 @@ read_feature_options(){
     v) ToggleFeature SQL_ENABLED ;;
     w) ToggleFeature OPENWSMAN_ENABLED ;;
     x) ToggleFeature AZURE_ENABLED ;;
+    y) ToggleFeature SYSTEMD_ENABLED ;;
     1) ToggleFeature TESTS_ENABLED ;;
     2) EnableAllFeatures ;;
     3) ToggleFeature JNI_ENABLED;;
diff --git a/cmake/Date.cmake b/cmake/Date.cmake
index 0ef5892..40b2690 100644
--- a/cmake/Date.cmake
+++ b/cmake/Date.cmake
@@ -55,10 +55,7 @@ FetchContent_Declare(date_src
 FetchContent_GetProperties(date_src)
 if (NOT date_src_POPULATED)
     FetchContent_Populate(date_src)
-    set(DATE_INCLUDE_DIR
-        $<BUILD_INTERFACE:${date_src_SOURCE_DIR}/include>
-        $<INSTALL_INTERFACE:include>
-    )
+    set(DATE_INCLUDE_DIR "${date_src_SOURCE_DIR}/include" CACHE STRING "" 
FORCE)
     add_library(date INTERFACE)
     add_library(date::date ALIAS date)
     target_sources(date INTERFACE ${DATE_INCLUDE_DIR}/date/date.h)
diff --git a/cmake/Extensions.cmake b/cmake/Extensions.cmake
index 1c7422b..8a9da62 100644
--- a/cmake/Extensions.cmake
+++ b/cmake/Extensions.cmake
@@ -26,6 +26,7 @@ macro(register_extension extension-name)
   get_property(extensions GLOBAL PROPERTY EXTENSION-OPTIONS)
   set_property(GLOBAL APPEND PROPERTY EXTENSION-OPTIONS ${extension-name})
   target_compile_definitions(${extension-name} PRIVATE 
"MODULE_NAME=${extension-name}")
+  set_target_properties(${extension-name} PROPERTIES ENABLE_EXPORTS True)
 endmacro()
 
 ### TESTING MACROS
diff --git a/controller/CMakeLists.txt b/controller/CMakeLists.txt
index 3e4db51..6610973 100644
--- a/controller/CMakeLists.txt
+++ b/controller/CMakeLists.txt
@@ -17,11 +17,7 @@
 # under the License.
 #
 
-cmake_minimum_required(VERSION 2.6)
-
-IF(POLICY CMP0048)
-  CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
+cmake_minimum_required(VERSION 3.11)
 
 include_directories(../main/ ../libminifi/include  ../libminifi/include/c2  
../libminifi/include/c2/protocols/  ../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../libminifi/include/core/yaml  ../libminifi/include/core)
 
diff --git a/extensions/ExtensionHeader.txt b/extensions/ExtensionHeader.txt
index 426a3e0..d7ac551 100644
--- a/extensions/ExtensionHeader.txt
+++ b/extensions/ExtensionHeader.txt
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-cmake_minimum_required(VERSION 2.6)
+cmake_minimum_required(VERSION 3.11)
 
 
 include_directories(../../libminifi/include ../../libminifi/include/core)
diff --git a/extensions/ExtensionHeader.txt b/extensions/systemd/CMakeLists.txt
similarity index 62%
copy from extensions/ExtensionHeader.txt
copy to extensions/systemd/CMakeLists.txt
index 426a3e0..f8cb6bc 100644
--- a/extensions/ExtensionHeader.txt
+++ b/extensions/systemd/CMakeLists.txt
@@ -17,13 +17,13 @@
 # under the License.
 #
 
-cmake_minimum_required(VERSION 2.6)
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 
+add_library(minifi-systemd STATIC ConsumeJournald.cpp WorkerThread.cpp 
libwrapper/LibWrapper.cpp libwrapper/DlopenWrapper.cpp)
+set_property(TARGET minifi-systemd PROPERTY POSITION_INDEPENDENT_CODE ON)
 
-include_directories(../../libminifi/include ../../libminifi/include/core)
+target_link_libraries(minifi-systemd ${LIBMINIFI} Threads::Threads date::date)
 
-if(WIN32)
-       include_directories(../../libminifi/opsys/win)
-else()
-       include_directories(../../libminifi/opsys/posix)
-endif()
+set(SYSTEMD-EXTENSION minifi-systemd PARENT_SCOPE)
+register_extension(minifi-systemd)
+register_extension_linter(minifi-systemd-extension-linter)
diff --git a/libminifi/include/utils/gsl.h b/extensions/systemd/Common.h
similarity index 77%
copy from libminifi/include/utils/gsl.h
copy to extensions/systemd/Common.h
index db175b5..73d90a2 100644
--- a/libminifi/include/utils/gsl.h
+++ b/extensions/systemd/Common.h
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,21 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#define LIBMINIFI_INCLUDE_UTILS_GSL_H_
 
-#include <gsl-lite/gsl-lite.hpp>
+#pragma once
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd {
 
-namespace gsl = ::gsl_lite;
+enum class JournalType { User, System, Both };
 
+}  // namespace systemd
+}  // namespace extensions
 }  // namespace minifi
 }  // namespace nifi
 }  // namespace apache
 }  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_UTILS_GSL_H_
diff --git a/extensions/systemd/ConsumeJournald.cpp 
b/extensions/systemd/ConsumeJournald.cpp
new file mode 100644
index 0000000..e7cd98c
--- /dev/null
+++ b/extensions/systemd/ConsumeJournald.cpp
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeJournald.h"
+
+#include <algorithm>
+
+#include "date/date.h"
+#include "spdlog/spdlog.h"  // TODO(szaszm): make fmt directly available
+#include "utils/GeneralUtils.h"
+
+namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd {
+
+constexpr const char* ConsumeJournald::CURSOR_KEY;
+const core::Relationship ConsumeJournald::Success("success", "Successfully 
consumed journal messages.");
+
+const core::Property ConsumeJournald::BatchSize = 
core::PropertyBuilder::createProperty("Batch Size")
+    ->withDescription("The maximum number of entries processed in a single 
execution.")
+    ->withDefaultValue<size_t>(1000)
+    ->isRequired(true)
+    ->build();
+
+const core::Property ConsumeJournald::PayloadFormat = 
core::PropertyBuilder::createProperty("Payload Format")
+    ->withDescription("Configures flow file content formatting. Raw: only the 
message. Syslog: similar to syslog or journalctl output.")
+    ->withDefaultValue<std::string>(PAYLOAD_FORMAT_SYSLOG)
+    ->withAllowableValues<std::string>({PAYLOAD_FORMAT_RAW, 
PAYLOAD_FORMAT_SYSLOG})
+    ->isRequired(true)
+    ->build();
+
+const core::Property ConsumeJournald::IncludeTimestamp = 
core::PropertyBuilder::createProperty("Include Timestamp")
+    ->withDescription("Include message timestamp in the 'timestamp' 
attribute.")
+    ->withDefaultValue<bool>(true)
+    ->isRequired(true)
+    ->build();
+
+const core::Property ConsumeJournald::JournalType = 
core::PropertyBuilder::createProperty("Journal Type")
+    ->withDescription("Type of journal to consume.")
+    ->withDefaultValue<std::string>(JOURNAL_TYPE_SYSTEM)
+    ->withAllowableValues<std::string>({JOURNAL_TYPE_USER, 
JOURNAL_TYPE_SYSTEM, JOURNAL_TYPE_BOTH})
+    ->isRequired(true)
+    ->build();
+
+const core::Property ConsumeJournald::ProcessOldMessages = 
core::PropertyBuilder::createProperty("Process Old Messages")
+    ->withDescription("Process events created before the first usage 
(schedule) of the processor instance.")
+    ->withDefaultValue<bool>(false)
+    ->isRequired(true)
+    ->build();
+
+const core::Property ConsumeJournald::TimestampFormat = 
core::PropertyBuilder::createProperty("Timestamp Format")
+    ->withDescription("Format string to use when creating the timestamp 
attribute or writing messages in the syslog format.")
+    ->withDefaultValue("%x %X %Z")
+    ->isRequired(true)
+    ->build();
+
+ConsumeJournald::ConsumeJournald(const std::string &name, const 
utils::Identifier &id, std::unique_ptr<libwrapper::LibWrapper>&& libwrapper)
+    :core::Processor{name, id}, libwrapper_{std::move(libwrapper)}
+{}
+
+void ConsumeJournald::initialize() {
+  setSupportedProperties({BatchSize, PayloadFormat, IncludeTimestamp, 
JournalType, ProcessOldMessages, TimestampFormat});
+  setSupportedRelationships({Success});
+
+  worker_ = utils::make_unique<Worker>();
+}
+
+void ConsumeJournald::notifyStop() {
+  bool running = true;
+  if (!running_.compare_exchange_strong(running, false, 
std::memory_order_acq_rel) || !journal_) return;
+  worker_->enqueue([this] {
+    journal_ = nullptr;
+  }).get();
+  worker_ = nullptr;
+}
+
+void ConsumeJournald::onSchedule(core::ProcessContext* const context, 
core::ProcessSessionFactory* const sessionFactory) {
+  gsl_Expects(context && sessionFactory && !running_ && worker_);
+  using JournalTypeEnum = systemd::JournalType;
+
+  const auto parse_payload_format = [](const std::string& property_value) -> 
utils::optional<systemd::PayloadFormat> {
+    if (property_value == PAYLOAD_FORMAT_RAW) return 
systemd::PayloadFormat::Raw;
+    if (property_value == PAYLOAD_FORMAT_SYSLOG) return 
systemd::PayloadFormat::Syslog;
+    return utils::nullopt;
+  };
+  const auto parse_journal_type = [](const std::string& property_value) -> 
utils::optional<JournalTypeEnum> {
+    if (property_value == JOURNAL_TYPE_USER) return JournalTypeEnum::User;
+    if (property_value == JOURNAL_TYPE_SYSTEM) return JournalTypeEnum::System;
+    if (property_value == JOURNAL_TYPE_BOTH) return JournalTypeEnum::Both;
+    return utils::nullopt;
+  };
+  batch_size_ = context->getProperty<size_t>(BatchSize).value();
+  payload_format_ = (context->getProperty(PayloadFormat) | 
utils::flatMap(parse_payload_format)
+      | utils::orElse([]{ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, 
"invalid payload format"}; }))
+      .value();
+  include_timestamp_ = context->getProperty<bool>(IncludeTimestamp).value();
+  const auto journal_type = (context->getProperty(JournalType) | 
utils::flatMap(parse_journal_type)
+      | utils::orElse([]{ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, 
"invalid journal type"}; }))
+      .value();
+  const auto process_old_messages = 
context->getProperty<bool>(ProcessOldMessages).value_or(false);
+  timestamp_format_ = [&context] {
+    auto tf_prop = (context->getProperty(TimestampFormat)
+        | utils::orElse([]{ throw 
Exception{ExceptionType::PROCESSOR_EXCEPTION, "invalid timestamp format" }; }))
+        .value();
+    if (tf_prop == "ISO" || tf_prop == "ISO 8601" || tf_prop == "ISO8601") 
return std::string{"%FT%T%Ez"};
+    return tf_prop;
+  }();
+
+  state_manager_ = context->getStateManager();
+  // All journal-related API calls are thread-agnostic, meaning they need to 
be called from the same thread. In our environment,
+  // where a processor can easily be scheduled on different threads, we ensure 
this by executing all library calls on a dedicated
+  // worker thread. This is why all such operations are dispatched to a thread 
and immediately waited for in the initiating thread.
+  journal_ = worker_->enqueue([this, journal_type]{ return 
libwrapper_->openJournal(journal_type); }).get();
+  const auto seek_default = [process_old_messages](libwrapper::Journal& 
journal) {
+    return process_old_messages ? journal.seekHead() : journal.seekTail();
+  };
+  worker_->enqueue([this, &seek_default] {
+    const auto cursor = state_manager_->get() | 
utils::map([](std::unordered_map<std::string, std::string>&& m) { return 
m.at(CURSOR_KEY); });
+    if (!cursor) {
+      seek_default(*journal_);
+    } else {
+      const auto ret = journal_->seekCursor(cursor->c_str());
+      if (ret < 0) {
+        const auto error_message = 
std::generic_category().default_error_condition(-ret).message();
+        logger_->log_warn("Failed to seek to cursor: %s. Seeking to tail or 
head (depending on Process Old Messages property) instead. cursor=\"%s\"", 
error_message, *cursor);
+        seek_default(*journal_);
+      }
+    }
+  }).get();
+  running_ = true;
+}
+
+void ConsumeJournald::onTrigger(core::ProcessContext* const context, 
core::ProcessSession* const session) {
+  gsl_Expects(context && session);
+  if (!running_.load(std::memory_order_acquire)) return;
+  auto cursor_and_messages = getCursorAndMessageBatch().get();
+  auto messages = std::move(cursor_and_messages.second);
+  if (messages.empty()) {
+    yield();
+    return;
+  }
+
+  for (auto& msg: messages) {
+    const auto flow_file = session->create();
+    if (payload_format_ == systemd::PayloadFormat::Syslog) 
session->writeBuffer(flow_file, gsl::make_span(formatSyslogMessage(msg)));
+    for (auto& field: msg.fields) {
+      if (field.name == "MESSAGE" && payload_format_ == 
systemd::PayloadFormat::Raw) {
+        session->writeBuffer(flow_file, gsl::make_span(field.value));
+      } else {
+        flow_file->setAttribute(std::move(field.name), std::move(field.value));
+      }
+    }
+    if (include_timestamp_) flow_file->setAttribute("timestamp", 
date::format(timestamp_format_, msg.timestamp));
+    session->transfer(flow_file, Success);
+  }
+  session->commit();
+  state_manager_->set({{"cursor", std::move(cursor_and_messages.first)}});
+}
+
+utils::optional<gsl::span<const char>> 
ConsumeJournald::enumerateJournalEntry(libwrapper::Journal& journal) {
+  const void* data_ptr{};
+  size_t data_length{};
+  const auto status_code = journal.enumerateData(&data_ptr, &data_length);
+  if (status_code == 0) return {};
+  if (status_code < 0) throw SystemErrorException{ 
"sd_journal_enumerate_data", 
std::generic_category().default_error_condition(-status_code) };
+  gsl_Ensures(data_ptr && "if sd_journal_enumerate_data was successful, then 
data_ptr must be set");
+  gsl_Ensures(data_length > 0 && "if sd_journal_enumerate_data was successful, 
then data_length must be greater than zero");
+  const char* const data_str_ptr = reinterpret_cast<const char*>(data_ptr);
+  return gsl::make_span(data_str_ptr, data_length);
+}
+
+utils::optional<ConsumeJournald::journal_field> 
ConsumeJournald::getNextField(libwrapper::Journal& journal) {
+  return enumerateJournalEntry(journal) | utils::map([](gsl::span<const char> 
field) {
+    const auto eq_pos = std::find(std::begin(field), std::end(field), '=');
+    gsl_Ensures(eq_pos != std::end(field) && "field string must contain an 
equals sign");
+    const auto eq_idx = gsl::narrow<size_t>(eq_pos - std::begin(field));
+    return journal_field{
+        utils::span_to<std::string>(field.subspan(0, eq_idx)),
+        utils::span_to<std::string>(field.subspan(eq_idx + 1))
+    };
+  });
+}
+
+std::future<std::pair<std::string, 
std::vector<ConsumeJournald::journal_message>>> 
ConsumeJournald::getCursorAndMessageBatch() {
+  gsl_Expects(worker_);
+  return worker_->enqueue([this] {
+    std::vector<journal_message> messages;
+    messages.reserve(batch_size_);
+    for (size_t i = 0; i < batch_size_ && journal_->next() > 0; ++i) {
+      journal_message message;
+      utils::optional<journal_field> field;
+      while ((field = getNextField(*journal_)).has_value()) {
+        message.fields.push_back(std::move(*field));
+      }
+      if (include_timestamp_ || payload_format_ == 
systemd::PayloadFormat::Syslog) {
+        message.timestamp = [this] {
+          uint64_t journal_timestamp_usec_since_epoch{};
+          journal_->getRealtimeUsec(&journal_timestamp_usec_since_epoch);
+          return 
std::chrono::system_clock::time_point{std::chrono::microseconds{journal_timestamp_usec_since_epoch}};
+        }();
+      }
+      messages.push_back(std::move(message));
+    }
+
+    return std::make_pair(getCursor(), messages);
+  });
+}
+
+std::string ConsumeJournald::formatSyslogMessage(const journal_message& msg) 
const {
+  gsl_Expects(msg.timestamp != decltype(msg.timestamp){});
+
+  const std::string* systemd_hostname = nullptr;
+  const std::string* syslog_pid = nullptr;
+  const std::string* systemd_pid = nullptr;
+  const std::string* syslog_identifier = nullptr;
+  const std::string* message = nullptr;
+
+  for (const auto& field: msg.fields) {
+    if (field.name == "_HOSTNAME") systemd_hostname = &field.value;
+    else if (field.name == "SYSLOG_PID") syslog_pid = &field.value;
+    else if (field.name == "_PID") systemd_pid = &field.value;
+    else if (field.name == "SYSLOG_IDENTIFIER") syslog_identifier = 
&field.value;
+    else if (field.name == "MESSAGE") message = &field.value;
+    else if (systemd_hostname && (syslog_pid || systemd_pid) && 
syslog_identifier && message) break;
+  }
+
+  gsl_Ensures(message && "MESSAGE is guaranteed to be present");
+
+  const auto pid_string = utils::optional_from_ptr(syslog_pid)
+      | utils::orElse([&] { return utils::optional_from_ptr(systemd_pid); })
+      | utils::map([](const std::string* const pid) { return 
fmt::format("[{}]", *pid); });
+
+  return fmt::format("{} {} {}{}: {}",
+      date::format(timestamp_format_, msg.timestamp),
+      (utils::optional_from_ptr(systemd_hostname) | 
utils::map(utils::dereference)).value_or("unknown_host"),
+      (utils::optional_from_ptr(syslog_identifier) | 
utils::map(utils::dereference)).value_or("unknown_process"),
+      pid_string.value_or(std::string{}),
+      *message);
+}
+
+std::string ConsumeJournald::getCursor() const {
+  const auto cursor = [this] {
+    gsl::owner<char*> cursor_out;
+    const auto err_code = journal_->getCursor(&cursor_out);
+    if (err_code < 0) throw SystemErrorException{"sd_journal_get_cursor", 
std::generic_category().default_error_condition(-err_code)};
+    gsl_Ensures(cursor_out);
+    return std::unique_ptr<char, utils::FreeDeleter>{cursor_out};
+  }();
+  return std::string{cursor.get()};
+}
+
+}  // namespace systemd
+}  // namespace extensions
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/systemd/ConsumeJournald.h 
b/extensions/systemd/ConsumeJournald.h
new file mode 100644
index 0000000..6099fed
--- /dev/null
+++ b/extensions/systemd/ConsumeJournald.h
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <chrono>
+#include <future>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "core/CoreComponentState.h"
+#include "core/Processor.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "libwrapper/LibWrapper.h"
+#include "utils/Deleters.h"
+#include "utils/gsl.h"
+#include "utils/OptionalUtils.h"
+#include "WorkerThread.h"
+
+namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd {
+
+enum class PayloadFormat { Raw, Syslog };
+
+class ConsumeJournald final : public core::Processor {
+ public:
+  static constexpr const char* CURSOR_KEY = "cursor";
+  static constexpr const char* PAYLOAD_FORMAT_RAW = "Raw";
+  static constexpr const char* PAYLOAD_FORMAT_SYSLOG = "Syslog";
+  static constexpr const char* JOURNAL_TYPE_USER = "User";
+  static constexpr const char* JOURNAL_TYPE_SYSTEM = "System";
+  static constexpr const char* JOURNAL_TYPE_BOTH = "Both";
+
+  static const core::Relationship Success;
+
+  static const core::Property BatchSize;
+  static const core::Property PayloadFormat;
+  static const core::Property IncludeTimestamp;
+  static const core::Property JournalType;
+  static const core::Property ProcessOldMessages;
+  static const core::Property TimestampFormat;
+
+  explicit ConsumeJournald(const std::string& name, const utils::Identifier& 
id = {}, std::unique_ptr<libwrapper::LibWrapper>&& = 
libwrapper::createLibWrapper());
+  ConsumeJournald(const ConsumeJournald&) = delete;
+  ConsumeJournald(ConsumeJournald&&) = delete;
+  ConsumeJournald& operator=(const ConsumeJournald&) = delete;
+  ConsumeJournald& operator=(ConsumeJournald&&) = delete;
+  ~ConsumeJournald() final { notifyStop(); }
+
+  void initialize() final;
+  void notifyStop() final;
+  void onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* 
sessionFactory) final;
+  void onTrigger(core::ProcessContext* context, core::ProcessSession* session) 
final;
+
+  friend struct ConsumeJournaldTestAccessor;
+ private:
+  struct journal_field {
+    std::string name;
+    std::string value;
+  };
+
+  struct journal_message {
+    std::vector<journal_field> fields;
+    std::chrono::system_clock::time_point timestamp;
+  };
+
+  static utils::optional<gsl::span<const char>> 
enumerateJournalEntry(libwrapper::Journal&);
+  static utils::optional<journal_field> getNextField(libwrapper::Journal&);
+  std::future<std::pair<std::string, std::vector<journal_message>>> 
getCursorAndMessageBatch();
+  std::string formatSyslogMessage(const journal_message&) const;
+  std::string getCursor() const;
+
+  std::atomic<bool> running_{false};
+  std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<ConsumeJournald>::getLogger();
+  std::shared_ptr<core::CoreComponentStateManager> state_manager_;
+  std::unique_ptr<libwrapper::LibWrapper> libwrapper_;
+  std::unique_ptr<Worker> worker_;
+  std::unique_ptr<libwrapper::Journal> journal_;
+
+  std::size_t batch_size_ = 1000;
+  systemd::PayloadFormat payload_format_ = systemd::PayloadFormat::Syslog;
+  bool include_timestamp_ = true;
+  std::string timestamp_format_ = "%x %X %Z";
+};
+
+REGISTER_RESOURCE(ConsumeJournald, "Consume systemd-journald journal messages. 
Creates one flow file per message."
+    "Fields are mapped to attributes. Realtime timestamp is mapped to the 
'timestamp' attribute.");
+
+}  // namespace systemd
+}  // namespace extensions
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/utils/gsl.h b/extensions/systemd/WorkerThread.cpp
similarity index 62%
copy from libminifi/include/utils/gsl.h
copy to extensions/systemd/WorkerThread.cpp
index db175b5..4213d29 100644
--- a/libminifi/include/utils/gsl.h
+++ b/extensions/systemd/WorkerThread.cpp
@@ -14,21 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#define LIBMINIFI_INCLUDE_UTILS_GSL_H_
 
-#include <gsl-lite/gsl-lite.hpp>
+#include "WorkerThread.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd {
 
-namespace gsl = ::gsl_lite;
+namespace detail {
+WorkerThread::WorkerThread()
+    : thread_{&WorkerThread::run, this} {}
 
+WorkerThread::~WorkerThread() {
+  task_queue_.stop();
+  thread_.join();
+}
+
+void WorkerThread::run() noexcept {
+  while (task_queue_.isRunning()) {
+    task_queue_.consumeWait([](std::packaged_task<void()>&& f) { f(); });
+  }
+}
+}  // namespace detail
+
+}  // namespace systemd
+}  // namespace extensions
 }  // namespace minifi
 }  // namespace nifi
 }  // namespace apache
 }  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_UTILS_GSL_H_
diff --git a/extensions/systemd/WorkerThread.h 
b/extensions/systemd/WorkerThread.h
new file mode 100644
index 0000000..503fcb1
--- /dev/null
+++ b/extensions/systemd/WorkerThread.h
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <future>
+#include <thread>
+#include <utility>
+
+#include "utils/MinifiConcurrentQueue.h"
+
+namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd {
+
+namespace detail {
+class WorkerThread final {
+ public:
+  WorkerThread();
+
+  WorkerThread(const WorkerThread&) = delete;
+  WorkerThread(WorkerThread&&) = delete;
+  WorkerThread& operator=(WorkerThread) = delete;
+
+  ~WorkerThread();
+
+  template<typename... Args>
+  void enqueue(Args&&... args) { 
task_queue_.enqueue(std::forward<Args>(args)...); }
+
+ private:
+  void run() noexcept;
+
+  utils::ConditionConcurrentQueue<std::packaged_task<void()>> task_queue_;
+  std::thread thread_;
+};
+}  // namespace detail
+
+/**
+ * A worker that executes arbitrary functions with no parameters 
asynchronously on an internal thread, returning a future to the result.
+ */
+class Worker final {
+ public:
+  template<typename Func>
+  auto enqueue(Func func) -> std::future<decltype(func())> {
+    using result_type = decltype(func());
+    std::packaged_task<result_type()> task{std::move(func)};
+    auto future = task.get_future();
+    worker_thread_.enqueue(std::move(task));
+    return future;
+  }
+ private:
+  detail::WorkerThread worker_thread_;
+};
+
+}  // namespace systemd
+}  // namespace extensions
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/systemd/libwrapper/DlopenWrapper.cpp 
b/extensions/systemd/libwrapper/DlopenWrapper.cpp
new file mode 100644
index 0000000..2c8f452
--- /dev/null
+++ b/extensions/systemd/libwrapper/DlopenWrapper.cpp
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DlopenWrapper.h"
+
+#include <dlfcn.h>
+
+#include "Exception.h"
+#include "utils/gsl.h"
+#include "utils/StringUtils.h"
+
+struct sd_journal;
+
+namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd { namespace libwrapper {
+
+namespace {
+struct dlclose_deleter {
+  void operator()(void* const libhandle) { if (libhandle) dlclose(libhandle); }
+};
+}  // namespace
+
+class DlopenJournal : public Journal {
+ public:
+  explicit DlopenJournal(const JournalType type) {
+    constexpr auto SD_JOURNAL_LOCAL_ONLY = 1 << 0;
+    constexpr auto SD_JOURNAL_SYSTEM = 1 << 2;
+    constexpr auto SD_JOURNAL_CURRENT_USER = 1 << 3;
+    const int flags{(type == JournalType::User ? SD_JOURNAL_CURRENT_USER : 0)
+        | (type == JournalType::System ? SD_JOURNAL_SYSTEM : 0)
+        | SD_JOURNAL_LOCAL_ONLY};
+    const int error_code = open_(&j_, flags);
+    if (error_code < 0) {
+      // sd_journal_open returns negative errno values
+      const auto err_cond = 
std::generic_category().default_error_condition(-error_code);
+      throw SystemErrorException{"sd_journal_open", err_cond};
+    }
+  }
+  DlopenJournal(const DlopenJournal&) = delete;
+  DlopenJournal(DlopenJournal&&) = delete;
+  DlopenJournal& operator=(const DlopenJournal&) = delete;
+  DlopenJournal& operator=(DlopenJournal&&) = delete;
+
+  ~DlopenJournal() override {
+    if (j_ && close_) close_(j_);
+  }
+
+  int seekHead() noexcept override { return seek_head_(j_); }
+  int seekTail() noexcept override { return seek_tail_(j_); }
+  int seekCursor(const char* const cursor) noexcept override { return 
seek_cursor_(j_, cursor); }
+
+  int getCursor(gsl::owner<char*>* const cursor_out) noexcept override { 
return get_cursor_(j_, cursor_out); }
+
+  int next() noexcept override { return next_(j_); }
+  int enumerateData(const void** const data_out, size_t* const size_out) 
noexcept override { return enumerate_data_(j_, data_out, size_out); }
+
+  int getRealtimeUsec(uint64_t* const usec_out) noexcept override { return 
get_realtime_usec_(j_, usec_out); }
+
+ private:
+  template<typename F>
+  F loadSymbol(const char* const symbol_name) {
+    // The cast below is supported by POSIX platforms. 
https://stackoverflow.com/a/1096349
+    F const symbol = (F)dlsym(libhandle_.get(), symbol_name);
+    const char* const err = dlerror();
+    if (err) throw Exception(ExceptionType::GENERAL_EXCEPTION, 
utils::StringUtils::join_pack("dlsym(", symbol_name, "): ", err));
+    return symbol;
+  }
+
+  std::unique_ptr<void, dlclose_deleter> libhandle_{[] {
+    auto* const handle = dlopen("libsystemd.so.0", RTLD_LAZY);
+    if (!handle) throw Exception(ExceptionType::GENERAL_EXCEPTION, 
utils::StringUtils::join_pack("dlopen failed: ", dlerror()));
+    return handle;
+  }()};
+
+  int (*open_)(sd_journal**, int) = 
loadSymbol<decltype(open_)>("sd_journal_open");
+  void (*close_)(sd_journal*) = 
loadSymbol<decltype(close_)>("sd_journal_close");
+  int (*seek_head_)(sd_journal*) = 
loadSymbol<decltype(seek_head_)>("sd_journal_seek_head");
+  int (*seek_tail_)(sd_journal*) = 
loadSymbol<decltype(seek_tail_)>("sd_journal_seek_tail");
+  int (*seek_cursor_)(sd_journal*, const char*) = 
loadSymbol<decltype(seek_cursor_)>("sd_journal_seek_cursor");
+  int (*get_cursor_)(sd_journal*, char**) = 
loadSymbol<decltype(get_cursor_)>("sd_journal_get_cursor");
+  int (*next_)(sd_journal*) = loadSymbol<decltype(next_)>("sd_journal_next");
+  int (*enumerate_data_)(sd_journal*, const void**, size_t*) = 
loadSymbol<decltype(enumerate_data_)>("sd_journal_enumerate_data");
+  int (*get_realtime_usec_)(sd_journal*, uint64_t*) = 
loadSymbol<decltype(get_realtime_usec_)>("sd_journal_get_realtime_usec");
+
+  gsl::owner<sd_journal*> j_ = nullptr;
+};
+
+std::unique_ptr<Journal> DlopenWrapper::openJournal(const JournalType type) {
+  return utils::make_unique<DlopenJournal>(type);
+}
+
+}  // namespace libwrapper
+}  // namespace systemd
+}  // namespace extensions
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/utils/gsl.h 
b/extensions/systemd/libwrapper/DlopenWrapper.h
similarity index 70%
copy from libminifi/include/utils/gsl.h
copy to extensions/systemd/libwrapper/DlopenWrapper.h
index db175b5..88ae690 100644
--- a/libminifi/include/utils/gsl.h
+++ b/extensions/systemd/libwrapper/DlopenWrapper.h
@@ -14,21 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#define LIBMINIFI_INCLUDE_UTILS_GSL_H_
+#pragma once
 
-#include <gsl-lite/gsl-lite.hpp>
+#include "LibWrapper.h"
+#include <memory>
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd { namespace libwrapper {
 
-namespace gsl = ::gsl_lite;
+struct DlopenWrapper : LibWrapper {
+  std::unique_ptr<Journal> openJournal(JournalType) override;
+};
 
+}  // namespace libwrapper
+}  // namespace systemd
+}  // namespace extensions
 }  // namespace minifi
 }  // namespace nifi
 }  // namespace apache
 }  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_UTILS_GSL_H_
diff --git a/libminifi/include/utils/gsl.h 
b/extensions/systemd/libwrapper/LibWrapper.cpp
similarity index 69%
copy from libminifi/include/utils/gsl.h
copy to extensions/systemd/libwrapper/LibWrapper.cpp
index db175b5..d58e393 100644
--- a/libminifi/include/utils/gsl.h
+++ b/extensions/systemd/libwrapper/LibWrapper.cpp
@@ -14,21 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#define LIBMINIFI_INCLUDE_UTILS_GSL_H_
 
-#include <gsl-lite/gsl-lite.hpp>
+#include "LibWrapper.h"
+#include "DlopenWrapper.h"
+#include "utils/GeneralUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd { namespace libwrapper {
 
-namespace gsl = ::gsl_lite;
+std::unique_ptr<LibWrapper> createLibWrapper() {
+  return utils::make_unique<DlopenWrapper>();
+}
 
+}  // namespace libwrapper
+}  // namespace systemd
+}  // namespace extensions
 }  // namespace minifi
 }  // namespace nifi
 }  // namespace apache
 }  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_UTILS_GSL_H_
diff --git a/extensions/systemd/libwrapper/LibWrapper.h 
b/extensions/systemd/libwrapper/LibWrapper.h
new file mode 100644
index 0000000..d7e7b7c
--- /dev/null
+++ b/extensions/systemd/libwrapper/LibWrapper.h
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "../Common.h"
+#include "utils/gsl.h"
+
+namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd { namespace libwrapper {
+
+struct Journal {
+  virtual int seekHead() noexcept = 0;
+  virtual int seekTail() noexcept = 0;
+  virtual int seekCursor(const char*) noexcept = 0;
+
+  virtual int getCursor(gsl::owner<char*>* cursor_out) noexcept = 0;
+
+  virtual int next() noexcept = 0;
+  virtual int enumerateData(const void** data_out, size_t* size_out) noexcept 
= 0;
+
+  virtual int getRealtimeUsec(uint64_t* usec_out) noexcept = 0;
+
+  virtual ~Journal() = default;
+};
+
+
+struct LibWrapper {
+  virtual std::unique_ptr<Journal> openJournal(JournalType) = 0;
+  virtual ~LibWrapper() = default;
+};
+
+std::unique_ptr<LibWrapper> createLibWrapper();
+
+}  // namespace libwrapper
+}  // namespace systemd
+}  // namespace extensions
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/systemd/tests/CMakeLists.txt 
b/extensions/systemd/tests/CMakeLists.txt
new file mode 100644
index 0000000..5525afa
--- /dev/null
+++ b/extensions/systemd/tests/CMakeLists.txt
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+function(add_systemd_test testfile)
+    get_filename_component(TEST_TARGET "${testfile}" NAME_WE)
+    add_executable(${TEST_TARGET} "${testfile}")
+    target_include_directories(${TEST_TARGET} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/extensions/systemd/")
+    target_include_directories(${TEST_TARGET} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+    target_include_directories(${TEST_TARGET} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/libminifi/test")
+    target_wholearchive_library(${TEST_TARGET} minifi-systemd)
+    target_wholearchive_library(${TEST_TARGET} minifi-standard-processors)
+    createTests("${TEST_TARGET}")
+    add_test(NAME ${TEST_TARGET} COMMAND "${TEST_TARGET}" WORKING_DIRECTORY 
"${TEST_DIR}")
+    target_link_libraries(${TEST_TARGET} ${CATCH_MAIN_LIB})
+endfunction()
+
+add_systemd_test("ConsumeJournaldTest.cpp")
diff --git a/extensions/systemd/tests/ConsumeJournaldTest.cpp 
b/extensions/systemd/tests/ConsumeJournaldTest.cpp
new file mode 100644
index 0000000..7c13c7d
--- /dev/null
+++ b/extensions/systemd/tests/ConsumeJournaldTest.cpp
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cerrno>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "TestBase.h"
+#include "ConsumeJournald.h"
+#include "libwrapper/LibWrapper.h"
+#include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
+#include "utils/StringUtils.h"
+#include "Utils.h"
+
+namespace minifi = org::apache::nifi::minifi;
+namespace utils = minifi::utils;
+namespace systemd = minifi::extensions::systemd;
+namespace libwrapper = systemd::libwrapper;
+using systemd::JournalType;
+using systemd::ConsumeJournald;
+
+namespace {
+namespace gsl = minifi::gsl;
+struct JournalEntry final {
+  JournalEntry(const char* const identifier, const char* const message, const 
int pid = 0, std::vector<std::string> extra_fields = {}, const char* const 
hostname = "test-pc")
+    :fields{std::move(extra_fields)}
+  {
+    fields.reserve(fields.size() + 4);
+    fields.push_back(utils::StringUtils::join_pack("MESSAGE=", message));
+    fields.push_back(utils::StringUtils::join_pack("SYSLOG_IDENTIFIER=", 
identifier));
+    if (pid != 0) {
+      // The intention of the long expression below is a simple pseudo-random 
to test both branches equally
+      // without having to pull in complex random logic
+      const char* const pid_key =
+          (int{message[0]} + int{identifier[0]} + 
static_cast<int>(extra_fields.size()) + pid + int{hostname[0]}) % 2 == 0 ? 
"_PID" : "SYSLOG_PID";
+      fields.push_back(utils::StringUtils::join_pack(pid_key, "=", 
std::to_string(pid)));
+    }
+    fields.push_back(utils::StringUtils::join_pack("_HOSTNAME=", hostname));
+  }
+
+  std::vector<std::string> fields;  // in KEY=VALUE format, like systemd
+};
+struct TestJournal final : libwrapper::Journal {
+  explicit TestJournal(std::vector<JournalEntry>& journal)
+      :journal{&journal}
+  { }
+
+  int seekHead() noexcept override {
+    cursor = 0;
+    consumed = -1;
+    field_id = 0;
+    return 0;
+  }
+
+  int seekTail() noexcept override {
+    cursor = journal->size();
+    consumed = gsl::narrow<ssize_t>(journal->size() - 1);
+    field_id = 0;
+    return 0;
+  }
+
+  int seekCursor(const char* const cur) noexcept override {
+    try {
+      cursor = gsl::narrow<size_t>(std::stoll(cur) + 1);
+      consumed = gsl::narrow<ssize_t>(cursor - 1);
+    } catch (const std::invalid_argument&) {
+      return -EINVAL;
+    } catch (const std::out_of_range&) {
+      return -ERANGE;
+    }
+    field_id = 0;
+    return 0;
+  }
+
+  int getCursor(gsl::owner<char*>* const cursor_out) noexcept override {
+    *cursor_out = strdup(std::to_string(consumed).c_str());
+    return *cursor_out ? 0 : -ENOMEM;
+  }
+
+  int next() noexcept override {
+    cursor = gsl::narrow<size_t>(consumed + 1);
+    field_id = 0;
+    if (cursor >= journal->size()) return 0;
+    return 1;
+  }
+
+  int enumerateData(const void** const data_out, size_t* const size_out) 
noexcept override {
+    if (cursor >= journal->size()) {
+      cursor = gsl::narrow<size_t>(consumed + 1);
+      return -EADDRNOTAVAIL;
+    }
+    if (field_id >= (*journal)[cursor].fields.size()) return 0;
+    const auto result = gsl::narrow<int>((*journal)[cursor].fields.size() - 
field_id);
+    *data_out = (*journal)[cursor].fields[field_id].c_str();
+    *size_out = (*journal)[cursor].fields[field_id].size();
+    consumed = gsl::narrow<ssize_t>(cursor);
+    ++field_id;
+    return result;
+  }
+
+  int getRealtimeUsec(uint64_t* const usec_out) noexcept override {
+    constexpr auto _20210415171703 = 1618507023000000;
+    constexpr auto usec_per_sec = 1000000;
+    *usec_out = _20210415171703 + cursor * usec_per_sec + 123456;
+    return 0;
+  }
+
+  size_t cursor = 0;
+  ssize_t consumed = -1;
+  size_t field_id = 0;
+  gsl::not_null<std::vector<JournalEntry>*> journal;
+};
+struct TestLibWrapper final : libwrapper::LibWrapper {
+  explicit TestLibWrapper(std::vector<JournalEntry> journal)
+      :journal{std::move(journal)}
+  { }
+
+  std::unique_ptr<libwrapper::Journal> openJournal(JournalType) override {
+    return utils::make_unique<TestJournal>(journal);
+  }
+
+  std::vector<JournalEntry> journal;
+};
+}  // namespace
+
+namespace org { namespace apache { namespace nifi { namespace minifi { 
namespace extensions { namespace systemd {
+struct ConsumeJournaldTestAccessor {
+  FIELD_ACCESSOR(state_manager_);
+};
+}}}}}}  // namespace org::apache::nifi::minifi::extensions::systemd
+using 
org::apache::nifi::minifi::extensions::systemd::ConsumeJournaldTestAccessor;
+
+TEST_CASE("ConsumeJournald", "[consumejournald]") {
+  TestController test_controller;
+  LogTestController::getInstance().setTrace<ConsumeJournald>();
+  const auto plan = test_controller.createPlan();
+  auto libwrapper = utils::make_unique<TestLibWrapper>(TestLibWrapper{{
+      {"kernel", "Linux version 5.10.12-gentoo-x86_64 
([email protected]) (x86_64-pc-linux-gnu-gcc (Gentoo 10.2.0-r5 p6) 
10.2.0, GNU ld (Gentoo 2.35.2 p1) 2.35.2) #1 SMP Sat Feb 20 03:13:45 CET 
2021"},  // NOLINT
+      {"kernel", "NX (Execute Disable) protection: active"},
+      {"kernel", "ACPI: Local APIC address 0xfee00000"},
+      {"kernel", "HugeTLB registered 1.00 GiB page size, pre-allocated 0 
pages"},
+      {"kernel", "SCSI subsystem initialized"},
+      {"systemd", "Starting Rule-based Manager for Device Events and 
Files...", 1},
+  }});
+  auto* const libwrapper_observer = libwrapper.get();
+  const auto consume_journald = 
plan->addProcessor(std::make_shared<ConsumeJournald>("ConsumeJournald", 
utils::Identifier{},
+      std::move(libwrapper)), "ConsumeJournald");
+  REQUIRE(consume_journald->setProperty(ConsumeJournald::TimestampFormat, 
"ISO8601"));
+  const auto get_cursor_position = [&consume_journald]() -> std::string {
+    return 
ConsumeJournaldTestAccessor::get_state_manager_(dynamic_cast<ConsumeJournald&>(*consume_journald))->get()->at("cursor");
+  };
+
+  SECTION("defaults") {
+    // first run: seeks to the end, no flow files are created. Yields. Can't 
check cursor position, because it's only set during message consumption.
+    plan->runNextProcessor();
+    REQUIRE(nullptr == plan->getFlowFileProducedByCurrentProcessor());  // 
ConsumeJournald seeks to tail by default, therefore no flow files are produced
+    REQUIRE(consume_journald->isYield());
+
+    // add a flow file, check the content
+    libwrapper_observer->journal.emplace_back("systemd", "Mounted /boot.", 1);
+    plan->runCurrentProcessor();
+    REQUIRE("6" == get_cursor_position());
+    REQUIRE("2021-04-15T17:17:09.123456000+00:00 test-pc systemd[1]: Mounted 
/boot." == plan->getContent(plan->getCurrentFlowFile()));
+
+    // add two new messages, expect two new flow files
+    libwrapper_observer->journal.emplace_back("dbus-daemon", "[system] 
Successfully activated service 'org.freedesktop.UPower'", 2200);
+    libwrapper_observer->journal.emplace_back("NetworkManager", "<info>  
[1618507047.7278] manager: (virbr0): new Bridge device 
(/org/freedesktop/NetworkManager/Devices/5)", 2201);
+    plan->runCurrentProcessor();
+    REQUIRE(2 == plan->getNumFlowFileProducedByCurrentProcessor());
+    REQUIRE("8" == get_cursor_position());
+    const auto content = plan->getContent(plan->getCurrentFlowFile());
+    REQUIRE(!content.empty());
+  }
+  SECTION("Raw format, one-by-one") {
+    REQUIRE(consume_journald->setProperty(ConsumeJournald::BatchSize, "1"));
+    REQUIRE(consume_journald->setProperty(ConsumeJournald::PayloadFormat, 
"Raw"));
+    REQUIRE(consume_journald->setProperty(ConsumeJournald::ProcessOldMessages, 
"true"));
+    {
+      plan->runNextProcessor();
+      REQUIRE("0" == get_cursor_position());
+      REQUIRE(!consume_journald->isYield());
+      const auto flowfile = plan->getCurrentFlowFile();
+      const auto content = plan->getContent(flowfile);
+      REQUIRE("Linux version 5.10.12-gentoo-x86_64 ([email protected]) 
(x86_64-pc-linux-gnu-gcc (Gentoo 10.2.0-r5 p6) 10.2.0, GNU ld (Gentoo 2.35.2 
p1) 2.35.2) #1 SMP Sat Feb 20 03:13:45 CET 2021"  // NOLINT
+          == content);
+      REQUIRE("2021-04-15T17:17:03.123456000+00:00" == 
flowfile->getAttribute("timestamp").value_or("n/a"));
+    }
+    {
+      plan->runCurrentProcessor();
+      REQUIRE("1" == get_cursor_position());
+      const auto content = plan->getContent(plan->getCurrentFlowFile());
+      REQUIRE("NX (Execute Disable) protection: active" == content);
+      REQUIRE("2021-04-15T17:17:04.123456000+00:00" == 
plan->getCurrentFlowFile()->getAttribute("timestamp").value_or("n/a"));
+    }
+
+    plan->runCurrentProcessor();
+    REQUIRE("2" == get_cursor_position());
+    REQUIRE("ACPI: Local APIC address 0xfee00000" == 
plan->getContent(plan->getCurrentFlowFile()));
+
+    plan->runCurrentProcessor();
+    REQUIRE("HugeTLB registered 1.00 GiB page size, pre-allocated 0 pages" == 
plan->getContent(plan->getCurrentFlowFile()));
+
+    plan->runCurrentProcessor();
+    REQUIRE("SCSI subsystem initialized" == 
plan->getContent(plan->getCurrentFlowFile()));
+
+    plan->runCurrentProcessor();
+    REQUIRE("Starting Rule-based Manager for Device Events and Files..." == 
plan->getContent(plan->getCurrentFlowFile()));
+    REQUIRE("5" == get_cursor_position());
+
+    plan->runCurrentProcessor();
+    REQUIRE(nullptr == plan->getCurrentFlowFile());
+    REQUIRE("5" == get_cursor_position());
+
+    {
+      // add a flow file, check the content and the timestamp
+      libwrapper_observer->journal.emplace_back("systemd", "Mounted /boot.", 
1);
+      plan->runCurrentProcessor();
+      const auto flowfile = plan->getCurrentFlowFile();
+      const auto content = plan->getContent(flowfile);
+      REQUIRE("6" == get_cursor_position());
+      REQUIRE("2021-04-15T17:17:09.123456000+00:00" == 
flowfile->getAttribute("timestamp"));
+      REQUIRE("Mounted /boot." == content);
+      REQUIRE("test-pc" == 
flowfile->getAttribute("_HOSTNAME").value_or("n/a"));
+      REQUIRE("systemd" == 
flowfile->getAttribute("SYSLOG_IDENTIFIER").value_or("n/a"));
+      const auto pid = (flowfile->getAttribute("_PID") | utils::orElse([&] { 
return flowfile->getAttribute("SYSLOG_PID"); })).value_or("n/a");
+      REQUIRE("1" == pid);
+    }
+
+    plan->runCurrentProcessor();
+    REQUIRE(nullptr == plan->getCurrentFlowFile());
+    REQUIRE("6" == get_cursor_position());
+  }
+  SECTION("Include Timestamp is honored") {
+    REQUIRE(consume_journald->setProperty(ConsumeJournald::BatchSize, "1"));
+    REQUIRE(consume_journald->setProperty(ConsumeJournald::IncludeTimestamp, 
"false"));
+    plan->runNextProcessor();  // first run: seeks to the end, no flow files 
are created. Yields.
+    REQUIRE(nullptr == plan->getCurrentFlowFile());
+
+    // add a flow file, ensure that no timestamp is added to the attributes
+    libwrapper_observer->journal.emplace_back("systemd", "Mounted /boot.", 1);
+    plan->runCurrentProcessor();
+    REQUIRE("2021-04-15T17:17:09.123456000+00:00 test-pc systemd[1]: Mounted 
/boot." == plan->getContent(plan->getCurrentFlowFile()));
+    
REQUIRE(!plan->getCurrentFlowFile()->getAttribute("timestamp").has_value());
+  }
+  SECTION("Batch Size is honored") {
+    REQUIRE(consume_journald->setProperty(ConsumeJournald::BatchSize, "3"));
+    REQUIRE(consume_journald->setProperty(ConsumeJournald::ProcessOldMessages, 
"true"));
+    libwrapper_observer->journal.emplace_back("systemd", "Mounted /boot.", 1);
+    libwrapper_observer->journal.emplace_back("dbus-daemon", "[system] 
Successfully activated service 'org.freedesktop.UPower'", 2200);
+
+    plan->runNextProcessor();
+    REQUIRE(3 == plan->getNumFlowFileProducedByCurrentProcessor());
+
+    plan->runCurrentProcessor();
+    REQUIRE(6 == plan->getNumFlowFileProducedByCurrentProcessor());
+
+    plan->runCurrentProcessor();
+    REQUIRE(8 == plan->getNumFlowFileProducedByCurrentProcessor());
+    REQUIRE(!consume_journald->isYield());
+
+    plan->runCurrentProcessor();
+    REQUIRE(8 == plan->getNumFlowFileProducedByCurrentProcessor());
+    REQUIRE(consume_journald->isYield());
+  }
+  SECTION("throw on invalid batch size") {
+    REQUIRE_THROWS(consume_journald->setProperty(ConsumeJournald::BatchSize, 
"asdf"));
+  }
+  SECTION("throw on invalid payload format") {
+    REQUIRE(consume_journald->setProperty(ConsumeJournald::PayloadFormat, 
"raw"));  // case-sensitive
+    REQUIRE_THROWS(plan->scheduleProcessor(consume_journald));
+  }
+  SECTION("throw on invalid journal type") {
+    REQUIRE(consume_journald->setProperty(ConsumeJournald::JournalType, 
"hello"));
+    REQUIRE_THROWS(plan->scheduleProcessor(consume_journald));
+  }
+  SECTION("throw on invalid journal type") {
+    REQUIRE(consume_journald->setProperty(ConsumeJournald::JournalType, 
"hello"));
+    REQUIRE_THROWS(plan->scheduleProcessor(consume_journald));
+  }
+}
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 613d26a..f0c0ded 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -17,13 +17,10 @@
 # under the License.
 #
 
-cmake_minimum_required (VERSION 2.6)
+cmake_minimum_required (VERSION 3.11)
 
-project(nifi-libcore-minifi)
+project(nifi-libcore-minifi VERSION 0.9.0)
 set(PROJECT_NAME "nifi-libcore-minifi")
-set(PROJECT_VERSION_MAJOR 0)
-set(PROJECT_VERSION_MINOR 9)
-set(PROJECT_VERSION_PATCH 0)
 
 
 #### Establish Project Configuration ####
@@ -127,7 +124,7 @@ if(NOT WIN32)
        list(APPEND LIBMINIFI_LIBRARIES OSSP::libuuid++)
 endif()
 if (NOT OPENSSL_OFF)
-       list(APPEND LIBMINIFI_LIBRARIES OpenSSL::SSL)
+       list(APPEND LIBMINIFI_LIBRARIES OpenSSL::Crypto OpenSSL::SSL)
 endif()
 target_link_libraries(core-minifi ${CMAKE_DL_LIBS} ${LIBMINIFI_LIBRARIES})
 
diff --git a/libminifi/include/Exception.h b/libminifi/include/Exception.h
index ef85076..634060d 100644
--- a/libminifi/include/Exception.h
+++ b/libminifi/include/Exception.h
@@ -27,6 +27,7 @@
 #include <sstream>
 #include <stdexcept>
 #include <string>
+#include <system_error>
 
 #include "utils/StringUtils.h"
 
@@ -55,7 +56,7 @@ inline const char *ExceptionTypeToString(ExceptionType type) {
   if (type < MAX_EXCEPTION)
     return ExceptionStr[type];
   else
-    return NULL;
+    return nullptr;
 }
 
 struct Exception : public std::runtime_error {
@@ -63,12 +64,32 @@ struct Exception : public std::runtime_error {
    * Create a new exception
    */
   Exception(ExceptionType type, const std::string& errorMsg)
-      : std::runtime_error{ 
org::apache::nifi::minifi::utils::StringUtils::join_pack(ExceptionTypeToString(type),
 ": ", errorMsg) }
+      :Exception{ utils::StringUtils::join_pack(ExceptionTypeToString(type), 
": ", errorMsg) }
   { }
 
   Exception(ExceptionType type, const char* errorMsg)
-      : std::runtime_error{ 
org::apache::nifi::minifi::utils::StringUtils::join_pack(ExceptionTypeToString(type),
 ": ", errorMsg) }
+      :Exception{ utils::StringUtils::join_pack(ExceptionTypeToString(type), 
": ", errorMsg) }
   { }
+
+ protected:
+  explicit Exception(const std::string& errmsg)
+      :std::runtime_error{ errmsg }
+  {}
+  explicit Exception(const char* errmsg)
+      :std::runtime_error{ errmsg }
+  {}
+};
+
+struct SystemErrorException : Exception {
+  explicit SystemErrorException(const char* const operation, 
std::error_condition error_condition)
+      :Exception{ utils::StringUtils::join_pack(operation, ": ", 
error_condition.message()) },
+      error_condition_{ error_condition }
+  {}
+
+  std::error_condition error_condition() { return error_condition_; }
+
+ private:
+  std::error_condition error_condition_;
 };
 
 }  // namespace minifi
diff --git a/libminifi/include/core/ConfigurableComponent.h 
b/libminifi/include/core/ConfigurableComponent.h
index 0cd8723..86d588a 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -88,7 +88,7 @@ class ConfigurableComponent {
    * @param value property value.
    * @return whether property was set or not
    */
-  bool setProperty(Property &prop, std::string value);
+  bool setProperty(const Property& prop, std::string value);
 
   /**
      * Sets the property using the provided name
@@ -96,7 +96,7 @@ class ConfigurableComponent {
      * @param value property value.
      * @return whether property was set or not
      */
-  bool setProperty(Property &prop, PropertyValue &value);
+  bool setProperty(const Property& prop, PropertyValue &value);
 
   /**
    * Sets supported properties for the ConfigurableComponent
diff --git a/libminifi/include/core/CoreComponentState.h 
b/libminifi/include/core/CoreComponentState.h
index 3174b40..2bdc04a 100644
--- a/libminifi/include/core/CoreComponentState.h
+++ b/libminifi/include/core/CoreComponentState.h
@@ -26,6 +26,8 @@
 #include <map>
 #include <string>
 
+#include "utils/OptionalUtils.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -42,6 +44,15 @@ class CoreComponentStateManager {
 
   virtual bool get(CoreComponentState& kvs) = 0;
 
+  utils::optional<std::unordered_map<std::string, std::string>> get() {
+    std::unordered_map<std::string, std::string> out;
+    if (get(out)) {
+      return out;
+    } else {
+      return utils::nullopt;
+    }
+  }
+
   virtual bool clear() = 0;
 
   virtual bool persist() = 0;
diff --git a/libminifi/include/core/FlowFile.h 
b/libminifi/include/core/FlowFile.h
index 7b97fc7..dcca83d 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -158,8 +158,11 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
   /**
    * setAttribute, if attribute already there, update it, else, add it
    */
-  bool setAttribute(const std::string& key, const std::string& value) {
-    return attributes_.insert_or_assign(key, value).second;
+  bool setAttribute(const std::string& key, std::string value) {
+    return attributes_.insert_or_assign(key, std::move(value)).second;
+  }
+  bool setAttribute(std::string&& key, std::string value) {
+    return attributes_.insert_or_assign(std::move(key), 
std::move(value)).second;
   }
 
   /**
diff --git a/libminifi/include/core/ProcessContext.h 
b/libminifi/include/core/ProcessContext.h
index b57cc52..f76cf7e 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -40,6 +40,8 @@
 #include "core/FlowFile.h"
 #include "core/CoreComponentState.h"
 #include "utils/file/FileUtils.h"
+#include "utils/OptionalUtils.h"
+#include "utils/PropertyErrors.h"
 #include "VariableRegistry.h"
 
 namespace org {
@@ -97,6 +99,18 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
     return processor_node_;
   }
 
+  template<typename T = std::string>
+  typename std::enable_if<std::is_default_constructible<T>::value, 
utils::optional<T>>::type
+  getProperty(const Property& property) {
+    T value;
+    try {
+      if (!getProperty(property.getName(), value)) return utils::nullopt;
+    } catch (const utils::internal::ValueException&) {
+      return utils::nullopt;
+    }
+    return value;
+  }
+
   template<typename T>
   bool getProperty(const std::string &name, T &value) const {
     return getPropertyImp<typename std::common_type<T>::type>(name, value);
@@ -122,7 +136,7 @@ class ProcessContext : public 
controller::ControllerServiceLookup, public core::
     return processor_node_->setDynamicProperty(name, value);
   }
   // Sets the property value using the Property object
-  bool setProperty(Property prop, std::string value) {
+  bool setProperty(const Property& prop, std::string value) {
     return processor_node_->setProperty(prop, value);
   }
   // Whether the relationship is supported
diff --git a/libminifi/include/core/ProcessSession.h 
b/libminifi/include/core/ProcessSession.h
index 743adf8..302c8b1 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -37,6 +37,7 @@
 #include "FlowFile.h"
 #include "WeakReference.h"
 #include "provenance/Provenance.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -96,6 +97,8 @@ class ProcessSession : public ReferenceContainer {
   int read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback 
*callback);
   // Execute the given write callback against the content
   void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback 
*callback);
+  // Replace content with buffer
+  void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, 
gsl::span<const char> buffer);
   // Execute the given write/append callback against the content
   void append(const std::shared_ptr<core::FlowFile> &flow, 
OutputStreamCallback *callback);
   // Penalize the flow
diff --git a/libminifi/include/core/ProcessorNode.h 
b/libminifi/include/core/ProcessorNode.h
index 6a35232..557cc23 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -155,7 +155,7 @@ class ProcessorNode : public ConfigurableComponent, public 
Connectable {
    * @param value property value.
    * @return whether property was set or not
    */
-  bool setProperty(Property &prop, std::string value) {
+  bool setProperty(const Property &prop, std::string value) {
     const std::shared_ptr<ConfigurableComponent> processor_cast = 
std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
     bool ret = ConfigurableComponent::setProperty(prop, value);
     if (nullptr != processor_cast)
diff --git a/libminifi/include/core/TypedValues.h 
b/libminifi/include/core/TypedValues.h
index f735d37..b3808ba 100644
--- a/libminifi/include/core/TypedValues.h
+++ b/libminifi/include/core/TypedValues.h
@@ -51,7 +51,7 @@ class TimePeriodValue : public TransformableValue, public 
state::response::UInt6
 
   explicit TimePeriodValue(const std::string &timeString)
       : state::response::UInt64Value(0) {
-    TimeUnit units;
+    TimeUnit units{};
     if (!StringToTime(timeString, value, units)) {
       throw utils::internal::ParseException("Couldn't parse TimePeriodValue");
     }
diff --git a/libminifi/include/utils/FlatMap.h 
b/libminifi/include/utils/FlatMap.h
index 096f904..eb278af 100644
--- a/libminifi/include/utils/FlatMap.h
+++ b/libminifi/include/utils/FlatMap.h
@@ -185,6 +185,17 @@ class FlatMap{
     return {iterator{data_.begin() + data_.size() - 1}, true};
   }
 
+  template<typename M>
+  std::pair<iterator, bool> insert_or_assign(K&& key, M&& value) {
+    auto it = find(key);
+    if (it != end()) {
+      it->second = std::forward<M>(value);
+      return {it, false};
+    }
+    data_.emplace_back(std::move(key), std::forward<M>(value));
+    return {iterator{data_.begin() + data_.size() - 1}, true};
+  }
+
   iterator find(const K& key) {
     for (auto it = data_.begin(); it != data_.end(); ++it) {
       if (it->first == key) return iterator{it};
diff --git a/libminifi/include/utils/GeneralUtils.h 
b/libminifi/include/utils/GeneralUtils.h
index 4b11c67..bd70412 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -174,6 +174,15 @@ auto invoke(F&& f, Args&&... args) 
MINIFICPP_UTIL_DEDUCED(detail::invoke_impl(st
 using std::invoke
 #endif /* < C++17 */
 
+namespace detail {
+struct dereference_t {
+  template<typename T>
+  T &operator()(T *ptr) const noexcept { return *ptr; }
+};
+}  // namespace detail
+
+constexpr detail::dereference_t dereference{};
+
 }  // namespace utils
 }  // namespace minifi
 }  // namespace nifi
diff --git a/libminifi/include/utils/OptionalUtils.h 
b/libminifi/include/utils/OptionalUtils.h
index 905128c..8eec781 100644
--- a/libminifi/include/utils/OptionalUtils.h
+++ b/libminifi/include/utils/OptionalUtils.h
@@ -36,7 +36,8 @@ using nonstd::make_optional;
 
 template<typename T>
 optional<typename gsl_lite::remove_cvref<T>::type> optional_from_ptr(T&& obj) {
-  return obj == nullptr ? nullopt : optional<typename 
gsl_lite::remove_cvref<T>::type>{ std::forward<T>(obj) };
+  // detail::remove_cvref_t comes from gsl.h
+  return obj == nullptr ? nullopt : optional<detail::remove_cvref_t<T>>{ 
std::forward<T>(obj) };
 }
 
 template<typename>
@@ -62,6 +63,16 @@ auto operator|(const optional<SourceType>& o, map_wrapper<F> 
f) noexcept(noexcep
   }
 }
 
+template<typename SourceType, typename F>
+auto operator|(optional<SourceType>&& o, map_wrapper<F> f) 
noexcept(noexcept(utils::invoke(std::forward<F>(f.function), std::move(*o))))
+    -> optional<typename 
std::decay<decltype(utils::invoke(std::forward<F>(f.function), 
std::move(*o)))>::type> {
+  if (o.has_value()) {
+    return make_optional(utils::invoke(std::forward<F>(f.function), 
std::move(*o)));
+  } else {
+    return nullopt;
+  }
+}
+
 template<typename T>
 struct flat_map_wrapper {
   T function;
@@ -78,7 +89,43 @@ auto operator|(const optional<SourceType>& o, 
flat_map_wrapper<F> f) noexcept(no
     return nullopt;
   }
 }
+template<typename SourceType, typename F>
+auto operator|(optional<SourceType>&& o, flat_map_wrapper<F> f) 
noexcept(noexcept(utils::invoke(std::forward<F>(f.function), std::move(*o))))
+    -> optional<typename 
std::decay<decltype(*utils::invoke(std::forward<F>(f.function), 
std::move(*o)))>::type> {
+  
static_assert(is_optional<decltype(utils::invoke(std::forward<F>(f.function), 
std::move(*o)))>::value, "flatMap expects a function returning optional");
+  if (o.has_value()) {
+    return utils::invoke(std::forward<F>(f.function), std::move(*o));
+  } else {
+    return nullopt;
+  }
+}
 
+template<typename T>
+struct or_else_wrapper {
+  T function;
+};
+
+// orElse implementation
+template<typename SourceType, typename F>
+auto operator|(optional<SourceType> o, or_else_wrapper<F> f) 
noexcept(noexcept(utils::invoke(std::forward<F>(f.function))))
+    -> typename 
std::enable_if<std::is_same<decltype(utils::invoke(std::forward<F>(f.function))),
 void>::value, optional<SourceType>>::type {
+  if (o.has_value()) {
+    return o;
+  } else {
+    utils::invoke(std::forward<F>(f.function));
+    return nullopt;
+  }
+}
+
+template<typename SourceType, typename F>
+auto operator|(optional<SourceType> o, or_else_wrapper<F> f) 
noexcept(noexcept(utils::invoke(std::forward<F>(f.function))))
+    -> typename std::enable_if<std::is_same<typename 
std::decay<decltype(utils::invoke(std::forward<F>(f.function)))>::type, 
optional<SourceType>>::value, optional<SourceType>>::type {
+  if (o.has_value()) {
+    return o;
+  } else {
+    return utils::invoke(std::forward<F>(f.function));
+  }
+}
 }  // namespace detail
 
 template<typename T>
@@ -87,6 +134,9 @@ detail::map_wrapper<T&&> map(T&& func) noexcept { return 
{std::forward<T>(func)}
 template<typename T>
 detail::flat_map_wrapper<T&&> flatMap(T&& func) noexcept { return 
{std::forward<T>(func)}; }
 
+template<typename T>
+detail::or_else_wrapper<T&&> orElse(T&& func) noexcept { return 
{std::forward<T>(func)}; }
+
 }  // namespace utils
 }  // namespace minifi
 }  // namespace nifi
diff --git a/libminifi/include/utils/gsl.h b/libminifi/include/utils/gsl.h
index db175b5..9bd4f09 100644
--- a/libminifi/include/utils/gsl.h
+++ b/libminifi/include/utils/gsl.h
@@ -17,6 +17,8 @@
 #ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
 #define LIBMINIFI_INCLUDE_UTILS_GSL_H_
 
+#include <type_traits>
+
 #include <gsl-lite/gsl-lite.hpp>
 
 namespace org {
@@ -26,6 +28,26 @@ namespace minifi {
 
 namespace gsl = ::gsl_lite;
 
+namespace utils {
+namespace detail {
+template<typename T>
+using remove_cvref_t = typename std::remove_cv<typename 
std::remove_reference<T>::type>::type;
+}  // namespace detail
+
+template<typename Container, typename T>
+Container span_to(gsl::span<T> span) {
+  static_assert(std::is_constructible<Container, typename 
gsl::span<T>::iterator, typename gsl::span<T>::iterator>::value,
+      "The destination container must have an iterator (pointer) range 
constructor");
+  return Container(std::begin(span), std::end(span));
+}
+template<template<typename...> class Container, typename T>
+Container<detail::remove_cvref_t<T>> span_to(gsl::span<T> span) {
+  static_assert(std::is_constructible<Container<detail::remove_cvref_t<T>>, 
typename gsl::span<T>::iterator, typename gsl::span<T>::iterator>::value,
+      "The destination container must have an iterator (pointer) range 
constructor");
+  return span_to<Container<detail::remove_cvref_t<T>>>(span);
+}
+}  // namespace utils
+
 }  // namespace minifi
 }  // namespace nifi
 }  // namespace apache
diff --git a/libminifi/src/core/ConfigurableComponent.cpp 
b/libminifi/src/core/ConfigurableComponent.cpp
index f97b78f..dc3f877 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -115,7 +115,7 @@ bool ConfigurableComponent::updateProperty(const 
std::string &name, const std::s
  * @param value property value.
  * @return whether property was set or not
  */
-bool ConfigurableComponent::setProperty(Property &prop, std::string value) {
+bool ConfigurableComponent::setProperty(const Property& prop, std::string 
value) {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
   auto it = properties_.find(prop.getName());
 
@@ -144,7 +144,7 @@ bool ConfigurableComponent::setProperty(Property &prop, 
std::string value) {
   }
 }
 
-bool ConfigurableComponent::setProperty(Property &prop, PropertyValue &value) {
+bool ConfigurableComponent::setProperty(const Property& prop, PropertyValue 
&value) {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
   auto it = properties_.find(prop.getName());
 
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index f8e8c31..e3ce368 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -241,6 +241,18 @@ void ProcessSession::write(const 
std::shared_ptr<core::FlowFile> &flow, OutputSt
   }
 }
 
+void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& 
flow_file, gsl::span<const char> buffer) {
+  struct BufferOutputStreamCallback : OutputStreamCallback {
+    explicit BufferOutputStreamCallback(gsl::span<const char> buffer) 
:buffer{buffer} {}
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) final {
+      return stream->write(reinterpret_cast<const uint8_t*>(buffer.data()), 
buffer.size());
+    }
+    gsl::span<const char> buffer;
+  };
+  BufferOutputStreamCallback cb{ buffer };
+  write(flow_file, &cb);
+}
+
 void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, 
OutputStreamCallback *callback) {
   std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
   if (!claim) {
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 5a60145..4e06ebe 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -407,7 +407,7 @@ class TestController {
     return std::make_shared<TestPlan>(content_repo, flow_repo, repo, 
flow_version_, configuration, state_dir);
   }
 
-  void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = 
true, std::function<void(const std::shared_ptr<core::ProcessContext>&, const 
std::shared_ptr<core::ProcessSession>&)> verify =
+  void runSession(const std::shared_ptr<TestPlan> &plan, bool runToCompletion 
= true, const std::function<void(const std::shared_ptr<core::ProcessContext>&, 
const std::shared_ptr<core::ProcessSession>&)>& verify =
                       nullptr) {
     while (plan->runNextProcessor(verify) && runToCompletion) {
     }
diff --git a/libminifi/test/unit/GeneralUtilsTest.cpp 
b/libminifi/test/unit/GeneralUtilsTest.cpp
index e9b0bfb..1d01f33 100644
--- a/libminifi/test/unit/GeneralUtilsTest.cpp
+++ b/libminifi/test/unit/GeneralUtilsTest.cpp
@@ -134,3 +134,10 @@ TEST_CASE("GeneralUtils::invoke FunctionObject", "[invoke 
function object]") {
   // invoking lambda
   REQUIRE(60 == utils::invoke(int_timesn, 20));
 }
+
+TEST_CASE("GeneralUtils::dereference", "[dereference]") {
+  const int a = 42;
+  const auto* const pa = &a;
+  REQUIRE(42 == utils::dereference(pa));
+  REQUIRE(&a == &utils::dereference(pa));
+}
diff --git a/libminifi/include/utils/gsl.h b/libminifi/test/unit/GslTest.cpp
similarity index 64%
copy from libminifi/include/utils/gsl.h
copy to libminifi/test/unit/GslTest.cpp
index db175b5..f94145a 100644
--- a/libminifi/include/utils/gsl.h
+++ b/libminifi/test/unit/GslTest.cpp
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,21 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#define LIBMINIFI_INCLUDE_UTILS_GSL_H_
-
-#include <gsl-lite/gsl-lite.hpp>
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+#include <vector>
+#include <string>
+#include "../TestBase.h"
+#include "utils/gsl.h"
 
-namespace gsl = ::gsl_lite;
+namespace utils = org::apache::nifi::minifi::utils;
 
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+TEST_CASE("span to", "[span to]") {
+  const auto test_span = gsl::make_span("test text", 9);
+  const auto string = utils::span_to<std::string>(test_span);
+  const auto vector = utils::span_to<std::vector>(test_span);
 
-#endif  // LIBMINIFI_INCLUDE_UTILS_GSL_H_
+  REQUIRE(string == "test text");
+  REQUIRE('t' == vector[0]);
+  REQUIRE(9 == vector.size());
+}
diff --git a/libminifi/test/unit/OptionalTest.cpp 
b/libminifi/test/unit/OptionalTest.cpp
index 311afec..3be3f2a 100644
--- a/libminifi/test/unit/OptionalTest.cpp
+++ b/libminifi/test/unit/OptionalTest.cpp
@@ -45,3 +45,18 @@ TEST_CASE("optional flatMap", "[optional flat map]") {
   const auto test3 = utils::make_optional(7) | 
utils::flatMap(mutable_lval_func);
   REQUIRE(!test3.has_value());
 }
+
+TEST_CASE("optional orElse", "[optional or else]") {
+  const auto opt_7 = [] { return utils::make_optional(7); };
+  const auto test1 = utils::make_optional(6) | utils::orElse(opt_7);
+  const auto test2 = utils::optional<int>{} | utils::orElse(opt_7);
+  const auto test3 = utils::make_optional(3) | utils::orElse([]{});
+  const auto test4 = utils::optional<int>{} | utils::orElse([]{});
+  struct ex : std::exception {};
+
+  REQUIRE(6 == test1.value());
+  REQUIRE(7 == test2.value());
+  REQUIRE(3 == test3.value());
+  REQUIRE(!test4);
+  REQUIRE_THROWS_AS(utils::optional<bool>{} | utils::orElse([]{ throw ex{}; 
}), const ex&);
+}
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index b81e08f..8dd958b 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -17,11 +17,7 @@
 # under the License.
 #
 
-cmake_minimum_required(VERSION 2.6)
-
-IF(POLICY CMP0048)
-  CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
+cmake_minimum_required(VERSION 3.11)
 
 include_directories(../libminifi/include)
 
@@ -95,6 +91,7 @@ foreach(EXTENSION ${extensions})
 endforeach()
 
 set_target_properties(minifiexe PROPERTIES OUTPUT_NAME minifi)
+set_target_properties(minifiexe PROPERTIES ENABLE_EXPORTS True)
 
 if (NOT WIN32)
 add_custom_command(TARGET minifiexe POST_BUILD
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
index a941676..d4391e9 100644
--- a/nanofi/CMakeLists.txt
+++ b/nanofi/CMakeLists.txt
@@ -17,11 +17,7 @@
 # under the License.
 #
 
-cmake_minimum_required(VERSION 2.6)
-
-IF(POLICY CMP0048)
-  CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
+cmake_minimum_required(VERSION 3.11)
 
 include_directories(include)
 include_directories(../libminifi/include)
diff --git a/nanofi/ecu/CMakeLists.txt b/nanofi/ecu/CMakeLists.txt
index 9b55cbd..5985666 100644
--- a/nanofi/ecu/CMakeLists.txt
+++ b/nanofi/ecu/CMakeLists.txt
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-cmake_minimum_required(VERSION 2.6)
+cmake_minimum_required(VERSION 3.11)
 
 
 if (NOT WIN32)
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
index f35381a..bc33da7 100644
--- a/nanofi/examples/CMakeLists.txt
+++ b/nanofi/examples/CMakeLists.txt
@@ -17,11 +17,7 @@
 # under the License.
 #
 
-cmake_minimum_required(VERSION 2.6)
-
-IF(POLICY CMP0048)
-  CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
+cmake_minimum_required(VERSION 3.11)
 
 include_directories(/include)
 

Reply via email to