This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 490f45f MINIFICPP-1526 - Add ConsumeJournald to consume systemd
journal messages. Use libsystemd through dlopen(), which should be fine both
with LGPL and Apache License.
490f45f is described below
commit 490f45fbe39d729a9cd77dfd8f162166296bf84c
Author: Marton Szasz <[email protected]>
AuthorDate: Fri May 21 14:26:11 2021 +0200
MINIFICPP-1526 - Add ConsumeJournald to consume systemd journal messages.
Use libsystemd through dlopen(), which should be fine both with LGPL
and Apache License.
Signed-off-by: Adam Debreceni <[email protected]>
This closes #1044
---
.gitignore | 6 +
CMakeLists.txt | 14 +-
CPPLINT.cfg | 2 +-
PROCESSORS.md | 25 ++
README.md | 23 +-
bootstrap.sh | 17 +-
bstrp_functions.sh | 18 +-
cmake/Date.cmake | 5 +-
cmake/Extensions.cmake | 1 +
controller/CMakeLists.txt | 6 +-
extensions/ExtensionHeader.txt | 2 +-
.../CMakeLists.txt} | 14 +-
.../utils/gsl.h => extensions/systemd/Common.h | 16 +-
extensions/systemd/ConsumeJournald.cpp | 270 +++++++++++++++++++
extensions/systemd/ConsumeJournald.h | 114 ++++++++
.../gsl.h => extensions/systemd/WorkerThread.cpp | 29 +-
extensions/systemd/WorkerThread.h | 72 +++++
extensions/systemd/libwrapper/DlopenWrapper.cpp | 110 ++++++++
.../systemd/libwrapper/DlopenWrapper.h | 20 +-
.../systemd/libwrapper/LibWrapper.cpp | 20 +-
extensions/systemd/libwrapper/LibWrapper.h | 57 ++++
extensions/systemd/tests/CMakeLists.txt | 33 +++
extensions/systemd/tests/ConsumeJournaldTest.cpp | 296 +++++++++++++++++++++
libminifi/CMakeLists.txt | 9 +-
libminifi/include/Exception.h | 27 +-
libminifi/include/core/ConfigurableComponent.h | 4 +-
libminifi/include/core/CoreComponentState.h | 11 +
libminifi/include/core/FlowFile.h | 7 +-
libminifi/include/core/ProcessContext.h | 16 +-
libminifi/include/core/ProcessSession.h | 3 +
libminifi/include/core/ProcessorNode.h | 2 +-
libminifi/include/core/TypedValues.h | 2 +-
libminifi/include/utils/FlatMap.h | 11 +
libminifi/include/utils/GeneralUtils.h | 9 +
libminifi/include/utils/OptionalUtils.h | 52 +++-
libminifi/include/utils/gsl.h | 22 ++
libminifi/src/core/ConfigurableComponent.cpp | 4 +-
libminifi/src/core/ProcessSession.cpp | 12 +
libminifi/test/TestBase.h | 2 +-
libminifi/test/unit/GeneralUtilsTest.cpp | 7 +
.../{include/utils/gsl.h => test/unit/GslTest.cpp} | 28 +-
libminifi/test/unit/OptionalTest.cpp | 15 ++
main/CMakeLists.txt | 7 +-
nanofi/CMakeLists.txt | 6 +-
nanofi/ecu/CMakeLists.txt | 2 +-
nanofi/examples/CMakeLists.txt | 6 +-
46 files changed, 1296 insertions(+), 138 deletions(-)
diff --git a/.gitignore b/.gitignore
index cc40bfd..5926a60 100644
--- a/.gitignore
+++ b/.gitignore
@@ -47,6 +47,7 @@ cmake-build-debug
# Generated files
*flowfile_checkpoint
build
+/*build*
bt_state
bin
target
@@ -58,6 +59,11 @@ docs/generated
thirdparty/apache-rat/apache-rat*
/compile_commands.json
__pycache__/
+/corecomponentstate
+/flowfile_repository
+/content_repository
+/provenance_repository
+/logs
# Ignore source files that have been placed in the docker directory during
build
docker/minificppsource
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 136796d..f9cef3e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -19,12 +19,8 @@
cmake_minimum_required(VERSION 3.11)
cmake_policy(SET CMP0065 OLD) # default export policy, required for self-dlopen
-
-project(nifi-minifi-cpp)
+project(nifi-minifi-cpp VERSION 0.9.0)
set(PROJECT_NAME "nifi-minifi-cpp")
-set(PROJECT_VERSION_MAJOR 0)
-set(PROJECT_VERSION_MINOR 9)
-set(PROJECT_VERSION_PATCH 0)
# Optional build number for linux distribution targets' tar.gz output
set(BUILD_NUMBER "" CACHE STRING "Build number")
@@ -572,6 +568,14 @@ if (ENABLE_ALL OR ENABLE_AZURE)
createExtension(AZURE-EXTENSIONS "AZURE EXTENSIONS" "This enables Azure
support" "extensions/azure" "${TEST_DIR}/azure-tests")
endif()
+## Add the systemd extension
+if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
+ option(ENABLE_SYSTEMD "Disables the systemd extension." ON)
+ if (ENABLE_SYSTEMD)
+ createExtension(SYSTEMD-EXTENSIONS "SYSTEMD EXTENSIONS"
"Enabled log collection from journald" "extensions/systemd"
"extensions/systemd/tests")
+ endif()
+endif()
+
## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN
add_subdirectory(main)
diff --git a/CPPLINT.cfg b/CPPLINT.cfg
index 110ae8f..a89734e 100644
--- a/CPPLINT.cfg
+++ b/CPPLINT.cfg
@@ -1,2 +1,2 @@
set noparent
-filter=-runtime/reference,-runtime/string,-build/c++11,-build/include_subdir
+filter=-runtime/reference,-runtime/string,-build/c++11,-build/include_subdir,-whitespace/forcolon
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 4820510..afe08c8 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -9,6 +9,7 @@
- [CapturePacket](#capturepacket)
- [CaptureRTSPFrame](#capturertspframe)
- [CompressContent](#compresscontent)
+- [ConsumeJournald](#consumejournald)
- [ConsumeKafka](#consumekafka)
- [ConsumeMQTT](#consumemqtt)
- [DeleteS3Object](#deletes3object)
@@ -185,6 +186,30 @@ In the list below, the names of required properties appear
in bold. Any other pr
|failure|FlowFiles will be transferred to the failure relationship if they
fail to compress/decompress|
|success|FlowFiles will be transferred to the success relationship after
successfully being compressed or decompressed|
+
+## ConsumeJournald
+### Description
+Consume systemd-journald journal messages. Available on Linux only.
+
+### Properties
+All properties are required with a default value, making them effectively
optional. None of the properties support the NiFi Expression Language.
+
+| Name | Default Value |
Allowable Values |
Description
|
+| -------------------- | --------------- |
----------------------------------------------------------------------------------
|
--------------------------------------------------------------------------------------------------------------------------------------------------------
|
+| Batch Size | 1000 |
Positive numbers | The maximum number of
entries processed in a single execution.
|
+| Payload Format | Syslog |
Raw<br>Syslog | Configures flow file content
formatting.<br>Raw: only the message.<br>Syslog: similar to syslog or
journalctl output. |
+| Include Timestamp | true |
true<br>false | Include message timestamp in
the 'timestamp' attribute.
|
+| Journal Type | System |
User<br>System<br>Both | Type of journal to
consume.
|
+| Process Old Messages | false |
true<br>false | Process events created
before the first usage (schedule) of the processor instance.
|
+| Timestamp Format | %x %X %Z | [date
format](https://howardhinnant.github.io/date/date.html#to_stream_formatting) |
Format string to use when creating the timestamp attribute or writing messages
in the syslog format. ISO/ISO 8601/ISO8601 are equivalent to "%FT%T%Ez". |
+
+### Relationships
+
+| Name | Description |
+| ------- | ------------------------------ |
+| success | Journal messages as flow files |
+
+
## ConsumeKafka
### Description
diff --git a/README.md b/README.md
index b1c97be..f158d8f 100644
--- a/README.md
+++ b/README.md
@@ -92,6 +92,7 @@ Through JNI extensions you can run NiFi processors using
NARs. The JNI extension
| Sensors | GetEnvironmentalSensors<br/>GetMovementSensors |
-DENABLE_SENSORS=ON |
| SFTP |
[FetchSFTP](PROCESSORS.md#fetchsftp)<br/>[ListSFTP](PROCESSORS.md#listsftp)<br/>[PutSFTP](PROCESSORS.md#putsftp)
| -DENABLE_SFTP=ON |
| SQL |
[ExecuteSQL](PROCESSORS.md#executesql)<br/>[PutSQL](PROCESSORS.md#putsql)<br/>[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)<br/>
| -DENABLE_SQL=ON |
+| Systemd | [ConsumeJournald](PROCESSORS.md#consumejournald) |
-DENABLE_SYSTEMD=ON |
| Tensorflow |
TFApplyGraph<br/>TFConvertImageToTensor<br/>TFExtractTopLabels<br/> |
-DENABLE_TENSORFLOW=ON |
| USB Camera | [GetUSBCamera](PROCESSORS.md#getusbcamera) |
-DENABLE_USB_CAMERA=ON |
| Windows Event Log (Windows only) |
CollectorInitiatedSubscription<br/>ConsumeWindowsEventLog<br/>TailEventLog |
-DENABLE_WEL=ON |
@@ -345,8 +346,8 @@ $ # It is recommended that you install bison from source as
HomeBrew now uses an
Select MiNiFi C++ Features to toggle.
****************************************
A. Persistent Repositories .....Enabled
- B. Lib Curl Features ...........Enabled
- C. Lib Archive Features ........Enabled
+ B. libcurl features ...........Enabled
+ C. libarchive features ........Enabled
D. Execute Script support ......Enabled
E. Expression Language support .Enabled
F. Kafka support ...............Disabled
@@ -356,16 +357,16 @@ $ # It is recommended that you install bison from source
as HomeBrew now uses an
J. TensorFlow Support ..........Disabled
K. Bustache Support ............Disabled
L. MQTT Support ................Disabled
- M. SQLite Support ..............Disabled
- N. Python Support ..............Disabled
- O. COAP Support ................Enabled
- S. SFTP Support ................Disabled
- V. AWS Support .................Disabled
+ M. Python Support ..............Disabled
+ N. COAP Support ................Enabled
+ O. SFTP Support ................Disabled
+ S. AWS Support .................Disabled
T. OpenCV Support ..............Disabled
U. OPC-UA Support ..............Disabled
- W. SQL Support .................Disabled
- X. Openwsman Support ...........Disabled
- Y. Azure Support ...............Disabled
+ V. SQL Support .................Disabled
+ W. Openwsman Support ...........Disabled
+ X. Azure Support ...............Disabled
+ Y. Systemd Support .............Enabled
****************************************
Build Options.
****************************************
@@ -382,7 +383,7 @@ $ # It is recommended that you install bison from source as
HomeBrew now uses an
version of cmake or other software, or
incompatibility with other extensions
- Enter choice [ A - X or 1-7 ]
+ Enter choice [ A - Y or 1-7 ]
```
- Boostrap now saves state between runs. State will automatically be saved.
Provide -c or --clear to clear this state. The -i option provides a guided menu
install with the ability to change
diff --git a/bootstrap.sh b/bootstrap.sh
index 34fa8d1..d92d6f1 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -324,6 +324,8 @@ add_dependency OPC_ENABLED "mbedtls"
add_disabled_option AZURE_ENABLED ${FALSE} "ENABLE_AZURE"
+add_disabled_option SYSTEMD_ENABLED ${TRUE} "ENABLE_SYSTEMD"
+
USE_SHARED_LIBS=${TRUE}
ASAN_ENABLED=${FALSE}
FAIL_ON_WARNINGS=${FALSE}
@@ -352,7 +354,8 @@ if [ "${OVERRIDE_BUILD_IDENTIFIER}" !=
"${BUILD_IDENTIFIER}" ]; then
fi
if [ "$BUILD_DIR_D" != "build" ] && [ "$BUILD_DIR_D" != "$BUILD_DIR" ]; then
- read -r -p "Build dir will override stored state, $BUILD_DIR. Press any key
to continue " overwrite
+ echo -n "Build dir will override stored state, $BUILD_DIR. Press any key to
continue "
+ read -r overwrite
BUILD_DIR=$BUILD_DIR_D
fi
@@ -364,7 +367,8 @@ else
overwrite="Y"
if [ "$NO_PROMPT" = "false" ] && [ "$FEATURES_SELECTED" = "false" ]; then
echo "CMAKE Build dir (${BUILD_DIR}) exists, should we overwrite your
build directory before we begin?"
- read -r -p "If you have already bootstrapped, bootstrapping again isn't
necessary to run make [ Y/N ] " overwrite
+ echo -n "If you have already bootstrapped, bootstrapping again isn't
necessary to run make [ Y/N ] "
+ read -r overwrite
fi
if [ "$overwrite" = "N" ] || [ "$overwrite" = "n" ]; then
echo "Exiting ...."
@@ -375,8 +379,6 @@ else
fi
## change to the directory
-
-
pushd "${BUILD_DIR}" || exit 1
while [ ! "$FEATURES_SELECTED" == "true" ]
@@ -392,15 +394,13 @@ do
read_feature_options
fi
done
-### ensure we have all dependencies
+### ensure we have all dependencies
save_state
-
build_deps
## just in case
-
CMAKE_VERSION=$(${CMAKE_COMMAND} --version | head -n 1 | awk '{print $3}')
CMAKE_MAJOR=$(echo "$CMAKE_VERSION" | cut -d. -f1)
@@ -506,7 +506,8 @@ build_cmake_command(){
continue_with_plan="Y"
if [ ! "$NO_PROMPT" = "true" ]; then
- read -r -p "Command will be '${CMAKE_BUILD_COMMAND}', run this? [ Y/N ] "
continue_with_plan
+ echo -n "Command will be '${CMAKE_BUILD_COMMAND}', run this? [ Y/N ] "
+ read -r continue_with_plan
fi
if [ "$continue_with_plan" = "N" ] || [ "$continue_with_plan" = "n" ]; then
echo "Exiting ...."
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index d62e248..99e98dd 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -139,7 +139,8 @@ EnableAllFeatures(){
}
pause(){
- read -r -p "Press [Enter] key to continue..."
+ echo -n "Press [Enter] key to continue..."
+ read -r _
}
@@ -301,7 +302,8 @@ show_main_menu() {
read_main_menu_options(){
local choice
- read -r -p "Enter choice [ A-C ] " choice
+ echo -n "Enter choice [ A-C ] "
+ read -r choice
choice=$(echo "${choice}" | tr '[:upper:]' '[:lower:]')
case $choice in
a) MENU="features" ;;
@@ -325,7 +327,8 @@ show_advanced_features_menu() {
read_advanced_menu_options(){
local choice
- read -r -p "Enter choice [ A-C ] " choice
+ echo -n "Enter choice [ A-C ] "
+ read -r choice
choice=$(echo "${choice}" | tr '[:upper:]' '[:lower:]')
case $choice in
a) ToggleFeature PORTABLE_BUILD ;;
@@ -344,8 +347,8 @@ show_supported_features() {
echo " Select MiNiFi C++ Features to toggle."
echo "****************************************"
echo "A. Persistent Repositories .....$(print_feature_status
ROCKSDB_ENABLED)"
- echo "B. Lib Curl Features ...........$(print_feature_status
HTTP_CURL_ENABLED)"
- echo "C. Lib Archive Features ........$(print_feature_status
LIBARCHIVE_ENABLED)"
+ echo "B. libcurl features ............$(print_feature_status
HTTP_CURL_ENABLED)"
+ echo "C. libarchive features .........$(print_feature_status
LIBARCHIVE_ENABLED)"
echo "D. Execute Script support ......$(print_feature_status
EXECUTE_SCRIPT_ENABLED)"
echo "E. Expression Language support .$(print_feature_status
EXPRESSION_LANGAUGE_ENABLED)"
echo "F. Kafka support ...............$(print_feature_status KAFKA_ENABLED)"
@@ -364,6 +367,7 @@ show_supported_features() {
echo "V. SQL Support..................$(print_feature_status SQL_ENABLED)"
echo "W. Openwsman Support ...........$(print_feature_status
OPENWSMAN_ENABLED)"
echo "X. Azure Support ...............$(print_feature_status AZURE_ENABLED)"
+ echo "Y. Systemd Support .............$(print_feature_status
SYSTEMD_ENABLED)"
echo "****************************************"
echo " Build Options."
echo "****************************************"
@@ -386,7 +390,8 @@ show_supported_features() {
read_feature_options(){
local choice
- read -r -p "Enter choice [ A - X or 1-7 ] " choice
+ echo -n "Enter choice [ A - Y or 1-7 ] "
+ read -r choice
choice=$(echo "${choice}" | tr '[:upper:]' '[:lower:]')
case $choice in
a) ToggleFeature ROCKSDB_ENABLED ;;
@@ -415,6 +420,7 @@ read_feature_options(){
v) ToggleFeature SQL_ENABLED ;;
w) ToggleFeature OPENWSMAN_ENABLED ;;
x) ToggleFeature AZURE_ENABLED ;;
+ y) ToggleFeature SYSTEMD_ENABLED ;;
1) ToggleFeature TESTS_ENABLED ;;
2) EnableAllFeatures ;;
3) ToggleFeature JNI_ENABLED;;
diff --git a/cmake/Date.cmake b/cmake/Date.cmake
index 0ef5892..40b2690 100644
--- a/cmake/Date.cmake
+++ b/cmake/Date.cmake
@@ -55,10 +55,7 @@ FetchContent_Declare(date_src
FetchContent_GetProperties(date_src)
if (NOT date_src_POPULATED)
FetchContent_Populate(date_src)
- set(DATE_INCLUDE_DIR
- $<BUILD_INTERFACE:${date_src_SOURCE_DIR}/include>
- $<INSTALL_INTERFACE:include>
- )
+ set(DATE_INCLUDE_DIR "${date_src_SOURCE_DIR}/include" CACHE STRING ""
FORCE)
add_library(date INTERFACE)
add_library(date::date ALIAS date)
target_sources(date INTERFACE ${DATE_INCLUDE_DIR}/date/date.h)
diff --git a/cmake/Extensions.cmake b/cmake/Extensions.cmake
index 1c7422b..8a9da62 100644
--- a/cmake/Extensions.cmake
+++ b/cmake/Extensions.cmake
@@ -26,6 +26,7 @@ macro(register_extension extension-name)
get_property(extensions GLOBAL PROPERTY EXTENSION-OPTIONS)
set_property(GLOBAL APPEND PROPERTY EXTENSION-OPTIONS ${extension-name})
target_compile_definitions(${extension-name} PRIVATE
"MODULE_NAME=${extension-name}")
+ set_target_properties(${extension-name} PROPERTIES ENABLE_EXPORTS True)
endmacro()
### TESTING MACROS
diff --git a/controller/CMakeLists.txt b/controller/CMakeLists.txt
index 3e4db51..6610973 100644
--- a/controller/CMakeLists.txt
+++ b/controller/CMakeLists.txt
@@ -17,11 +17,7 @@
# under the License.
#
-cmake_minimum_required(VERSION 2.6)
-
-IF(POLICY CMP0048)
- CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
+cmake_minimum_required(VERSION 3.11)
include_directories(../main/ ../libminifi/include ../libminifi/include/c2
../libminifi/include/c2/protocols/ ../libminifi/include/core/state
./libminifi/include/core/statemanagement/metrics
../libminifi/include/core/yaml ../libminifi/include/core)
diff --git a/extensions/ExtensionHeader.txt b/extensions/ExtensionHeader.txt
index 426a3e0..d7ac551 100644
--- a/extensions/ExtensionHeader.txt
+++ b/extensions/ExtensionHeader.txt
@@ -17,7 +17,7 @@
# under the License.
#
-cmake_minimum_required(VERSION 2.6)
+cmake_minimum_required(VERSION 3.11)
include_directories(../../libminifi/include ../../libminifi/include/core)
diff --git a/extensions/ExtensionHeader.txt b/extensions/systemd/CMakeLists.txt
similarity index 62%
copy from extensions/ExtensionHeader.txt
copy to extensions/systemd/CMakeLists.txt
index 426a3e0..f8cb6bc 100644
--- a/extensions/ExtensionHeader.txt
+++ b/extensions/systemd/CMakeLists.txt
@@ -17,13 +17,13 @@
# under the License.
#
-cmake_minimum_required(VERSION 2.6)
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+add_library(minifi-systemd STATIC ConsumeJournald.cpp WorkerThread.cpp
libwrapper/LibWrapper.cpp libwrapper/DlopenWrapper.cpp)
+set_property(TARGET minifi-systemd PROPERTY POSITION_INDEPENDENT_CODE ON)
-include_directories(../../libminifi/include ../../libminifi/include/core)
+target_link_libraries(minifi-systemd ${LIBMINIFI} Threads::Threads date::date)
-if(WIN32)
- include_directories(../../libminifi/opsys/win)
-else()
- include_directories(../../libminifi/opsys/posix)
-endif()
+set(SYSTEMD-EXTENSION minifi-systemd PARENT_SCOPE)
+register_extension(minifi-systemd)
+register_extension_linter(minifi-systemd-extension-linter)
diff --git a/libminifi/include/utils/gsl.h b/extensions/systemd/Common.h
similarity index 77%
copy from libminifi/include/utils/gsl.h
copy to extensions/systemd/Common.h
index db175b5..73d90a2 100644
--- a/libminifi/include/utils/gsl.h
+++ b/extensions/systemd/Common.h
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,21 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#define LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#include <gsl-lite/gsl-lite.hpp>
+#pragma once
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org { namespace apache { namespace nifi { namespace minifi {
namespace extensions { namespace systemd {
-namespace gsl = ::gsl_lite;
+enum class JournalType { User, System, Both };
+} // namespace systemd
+} // namespace extensions
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_UTILS_GSL_H_
diff --git a/extensions/systemd/ConsumeJournald.cpp
b/extensions/systemd/ConsumeJournald.cpp
new file mode 100644
index 0000000..e7cd98c
--- /dev/null
+++ b/extensions/systemd/ConsumeJournald.cpp
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeJournald.h"
+
+#include <algorithm>
+
+#include "date/date.h"
+#include "spdlog/spdlog.h" // TODO(szaszm): make fmt directly available
+#include "utils/GeneralUtils.h"
+
+namespace org { namespace apache { namespace nifi { namespace minifi {
namespace extensions { namespace systemd {
+
+constexpr const char* ConsumeJournald::CURSOR_KEY;
+const core::Relationship ConsumeJournald::Success("success", "Successfully
consumed journal messages.");
+
+const core::Property ConsumeJournald::BatchSize =
core::PropertyBuilder::createProperty("Batch Size")
+ ->withDescription("The maximum number of entries processed in a single
execution.")
+ ->withDefaultValue<size_t>(1000)
+ ->isRequired(true)
+ ->build();
+
+const core::Property ConsumeJournald::PayloadFormat =
core::PropertyBuilder::createProperty("Payload Format")
+ ->withDescription("Configures flow file content formatting. Raw: only the
message. Syslog: similar to syslog or journalctl output.")
+ ->withDefaultValue<std::string>(PAYLOAD_FORMAT_SYSLOG)
+ ->withAllowableValues<std::string>({PAYLOAD_FORMAT_RAW,
PAYLOAD_FORMAT_SYSLOG})
+ ->isRequired(true)
+ ->build();
+
+const core::Property ConsumeJournald::IncludeTimestamp =
core::PropertyBuilder::createProperty("Include Timestamp")
+ ->withDescription("Include message timestamp in the 'timestamp'
attribute.")
+ ->withDefaultValue<bool>(true)
+ ->isRequired(true)
+ ->build();
+
+const core::Property ConsumeJournald::JournalType =
core::PropertyBuilder::createProperty("Journal Type")
+ ->withDescription("Type of journal to consume.")
+ ->withDefaultValue<std::string>(JOURNAL_TYPE_SYSTEM)
+ ->withAllowableValues<std::string>({JOURNAL_TYPE_USER,
JOURNAL_TYPE_SYSTEM, JOURNAL_TYPE_BOTH})
+ ->isRequired(true)
+ ->build();
+
+const core::Property ConsumeJournald::ProcessOldMessages =
core::PropertyBuilder::createProperty("Process Old Messages")
+ ->withDescription("Process events created before the first usage
(schedule) of the processor instance.")
+ ->withDefaultValue<bool>(false)
+ ->isRequired(true)
+ ->build();
+
+const core::Property ConsumeJournald::TimestampFormat =
core::PropertyBuilder::createProperty("Timestamp Format")
+ ->withDescription("Format string to use when creating the timestamp
attribute or writing messages in the syslog format.")
+ ->withDefaultValue("%x %X %Z")
+ ->isRequired(true)
+ ->build();
+
+ConsumeJournald::ConsumeJournald(const std::string &name, const
utils::Identifier &id, std::unique_ptr<libwrapper::LibWrapper>&& libwrapper)
+ :core::Processor{name, id}, libwrapper_{std::move(libwrapper)}
+{}
+
+void ConsumeJournald::initialize() {
+ setSupportedProperties({BatchSize, PayloadFormat, IncludeTimestamp,
JournalType, ProcessOldMessages, TimestampFormat});
+ setSupportedRelationships({Success});
+
+ worker_ = utils::make_unique<Worker>();
+}
+
+void ConsumeJournald::notifyStop() {
+ bool running = true;
+ if (!running_.compare_exchange_strong(running, false,
std::memory_order_acq_rel) || !journal_) return;
+ worker_->enqueue([this] {
+ journal_ = nullptr;
+ }).get();
+ worker_ = nullptr;
+}
+
+void ConsumeJournald::onSchedule(core::ProcessContext* const context,
core::ProcessSessionFactory* const sessionFactory) {
+ gsl_Expects(context && sessionFactory && !running_ && worker_);
+ using JournalTypeEnum = systemd::JournalType;
+
+ const auto parse_payload_format = [](const std::string& property_value) ->
utils::optional<systemd::PayloadFormat> {
+ if (property_value == PAYLOAD_FORMAT_RAW) return
systemd::PayloadFormat::Raw;
+ if (property_value == PAYLOAD_FORMAT_SYSLOG) return
systemd::PayloadFormat::Syslog;
+ return utils::nullopt;
+ };
+ const auto parse_journal_type = [](const std::string& property_value) ->
utils::optional<JournalTypeEnum> {
+ if (property_value == JOURNAL_TYPE_USER) return JournalTypeEnum::User;
+ if (property_value == JOURNAL_TYPE_SYSTEM) return JournalTypeEnum::System;
+ if (property_value == JOURNAL_TYPE_BOTH) return JournalTypeEnum::Both;
+ return utils::nullopt;
+ };
+ batch_size_ = context->getProperty<size_t>(BatchSize).value();
+ payload_format_ = (context->getProperty(PayloadFormat) |
utils::flatMap(parse_payload_format)
+ | utils::orElse([]{ throw Exception{ExceptionType::PROCESSOR_EXCEPTION,
"invalid payload format"}; }))
+ .value();
+ include_timestamp_ = context->getProperty<bool>(IncludeTimestamp).value();
+ const auto journal_type = (context->getProperty(JournalType) |
utils::flatMap(parse_journal_type)
+ | utils::orElse([]{ throw Exception{ExceptionType::PROCESSOR_EXCEPTION,
"invalid journal type"}; }))
+ .value();
+ const auto process_old_messages =
context->getProperty<bool>(ProcessOldMessages).value_or(false);
+ timestamp_format_ = [&context] {
+ auto tf_prop = (context->getProperty(TimestampFormat)
+ | utils::orElse([]{ throw
Exception{ExceptionType::PROCESSOR_EXCEPTION, "invalid timestamp format" }; }))
+ .value();
+ if (tf_prop == "ISO" || tf_prop == "ISO 8601" || tf_prop == "ISO8601")
return std::string{"%FT%T%Ez"};
+ return tf_prop;
+ }();
+
+ state_manager_ = context->getStateManager();
+ // All journal-related API calls are thread-agnostic, meaning they need to
be called from the same thread. In our environment,
+ // where a processor can easily be scheduled on different threads, we ensure
this by executing all library calls on a dedicated
+ // worker thread. This is why all such operations are dispatched to a thread
and immediately waited for in the initiating thread.
+ journal_ = worker_->enqueue([this, journal_type]{ return
libwrapper_->openJournal(journal_type); }).get();
+ const auto seek_default = [process_old_messages](libwrapper::Journal&
journal) {
+ return process_old_messages ? journal.seekHead() : journal.seekTail();
+ };
+ worker_->enqueue([this, &seek_default] {
+ const auto cursor = state_manager_->get() |
utils::map([](std::unordered_map<std::string, std::string>&& m) { return
m.at(CURSOR_KEY); });
+ if (!cursor) {
+ seek_default(*journal_);
+ } else {
+ const auto ret = journal_->seekCursor(cursor->c_str());
+ if (ret < 0) {
+ const auto error_message =
std::generic_category().default_error_condition(-ret).message();
+ logger_->log_warn("Failed to seek to cursor: %s. Seeking to tail or
head (depending on Process Old Messages property) instead. cursor=\"%s\"",
error_message, *cursor);
+ seek_default(*journal_);
+ }
+ }
+ }).get();
+ running_ = true;
+}
+
+void ConsumeJournald::onTrigger(core::ProcessContext* const context,
core::ProcessSession* const session) {
+ gsl_Expects(context && session);
+ if (!running_.load(std::memory_order_acquire)) return;
+ auto cursor_and_messages = getCursorAndMessageBatch().get();
+ auto messages = std::move(cursor_and_messages.second);
+ if (messages.empty()) {
+ yield();
+ return;
+ }
+
+ for (auto& msg: messages) {
+ const auto flow_file = session->create();
+ if (payload_format_ == systemd::PayloadFormat::Syslog)
session->writeBuffer(flow_file, gsl::make_span(formatSyslogMessage(msg)));
+ for (auto& field: msg.fields) {
+ if (field.name == "MESSAGE" && payload_format_ ==
systemd::PayloadFormat::Raw) {
+ session->writeBuffer(flow_file, gsl::make_span(field.value));
+ } else {
+ flow_file->setAttribute(std::move(field.name), std::move(field.value));
+ }
+ }
+ if (include_timestamp_) flow_file->setAttribute("timestamp",
date::format(timestamp_format_, msg.timestamp));
+ session->transfer(flow_file, Success);
+ }
+ session->commit();
+ state_manager_->set({{"cursor", std::move(cursor_and_messages.first)}});
+}
+
+utils::optional<gsl::span<const char>>
ConsumeJournald::enumerateJournalEntry(libwrapper::Journal& journal) {
+ const void* data_ptr{};
+ size_t data_length{};
+ const auto status_code = journal.enumerateData(&data_ptr, &data_length);
+ if (status_code == 0) return {};
+ if (status_code < 0) throw SystemErrorException{
"sd_journal_enumerate_data",
std::generic_category().default_error_condition(-status_code) };
+ gsl_Ensures(data_ptr && "if sd_journal_enumerate_data was successful, then
data_ptr must be set");
+ gsl_Ensures(data_length > 0 && "if sd_journal_enumerate_data was successful,
then data_length must be greater than zero");
+ const char* const data_str_ptr = reinterpret_cast<const char*>(data_ptr);
+ return gsl::make_span(data_str_ptr, data_length);
+}
+
+utils::optional<ConsumeJournald::journal_field>
ConsumeJournald::getNextField(libwrapper::Journal& journal) {
+ return enumerateJournalEntry(journal) | utils::map([](gsl::span<const char>
field) {
+ const auto eq_pos = std::find(std::begin(field), std::end(field), '=');
+ gsl_Ensures(eq_pos != std::end(field) && "field string must contain an
equals sign");
+ const auto eq_idx = gsl::narrow<size_t>(eq_pos - std::begin(field));
+ return journal_field{
+ utils::span_to<std::string>(field.subspan(0, eq_idx)),
+ utils::span_to<std::string>(field.subspan(eq_idx + 1))
+ };
+ });
+}
+
+std::future<std::pair<std::string,
std::vector<ConsumeJournald::journal_message>>>
ConsumeJournald::getCursorAndMessageBatch() {
+ gsl_Expects(worker_);
+ return worker_->enqueue([this] {
+ std::vector<journal_message> messages;
+ messages.reserve(batch_size_);
+ for (size_t i = 0; i < batch_size_ && journal_->next() > 0; ++i) {
+ journal_message message;
+ utils::optional<journal_field> field;
+ while ((field = getNextField(*journal_)).has_value()) {
+ message.fields.push_back(std::move(*field));
+ }
+ if (include_timestamp_ || payload_format_ ==
systemd::PayloadFormat::Syslog) {
+ message.timestamp = [this] {
+ uint64_t journal_timestamp_usec_since_epoch{};
+ journal_->getRealtimeUsec(&journal_timestamp_usec_since_epoch);
+ return
std::chrono::system_clock::time_point{std::chrono::microseconds{journal_timestamp_usec_since_epoch}};
+ }();
+ }
+ messages.push_back(std::move(message));
+ }
+
+ return std::make_pair(getCursor(), messages);
+ });
+}
+
+std::string ConsumeJournald::formatSyslogMessage(const journal_message& msg)
const {
+ gsl_Expects(msg.timestamp != decltype(msg.timestamp){});
+
+ const std::string* systemd_hostname = nullptr;
+ const std::string* syslog_pid = nullptr;
+ const std::string* systemd_pid = nullptr;
+ const std::string* syslog_identifier = nullptr;
+ const std::string* message = nullptr;
+
+ for (const auto& field: msg.fields) {
+ if (field.name == "_HOSTNAME") systemd_hostname = &field.value;
+ else if (field.name == "SYSLOG_PID") syslog_pid = &field.value;
+ else if (field.name == "_PID") systemd_pid = &field.value;
+ else if (field.name == "SYSLOG_IDENTIFIER") syslog_identifier =
&field.value;
+ else if (field.name == "MESSAGE") message = &field.value;
+ else if (systemd_hostname && (syslog_pid || systemd_pid) &&
syslog_identifier && message) break;
+ }
+
+ gsl_Ensures(message && "MESSAGE is guaranteed to be present");
+
+ const auto pid_string = utils::optional_from_ptr(syslog_pid)
+ | utils::orElse([&] { return utils::optional_from_ptr(systemd_pid); })
+ | utils::map([](const std::string* const pid) { return
fmt::format("[{}]", *pid); });
+
+ return fmt::format("{} {} {}{}: {}",
+ date::format(timestamp_format_, msg.timestamp),
+ (utils::optional_from_ptr(systemd_hostname) |
utils::map(utils::dereference)).value_or("unknown_host"),
+ (utils::optional_from_ptr(syslog_identifier) |
utils::map(utils::dereference)).value_or("unknown_process"),
+ pid_string.value_or(std::string{}),
+ *message);
+}
+
+std::string ConsumeJournald::getCursor() const {
+ const auto cursor = [this] {
+ gsl::owner<char*> cursor_out;
+ const auto err_code = journal_->getCursor(&cursor_out);
+ if (err_code < 0) throw SystemErrorException{"sd_journal_get_cursor",
std::generic_category().default_error_condition(-err_code)};
+ gsl_Ensures(cursor_out);
+ return std::unique_ptr<char, utils::FreeDeleter>{cursor_out};
+ }();
+ return std::string{cursor.get()};
+}
+
+} // namespace systemd
+} // namespace extensions
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/systemd/ConsumeJournald.h
b/extensions/systemd/ConsumeJournald.h
new file mode 100644
index 0000000..6099fed
--- /dev/null
+++ b/extensions/systemd/ConsumeJournald.h
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <chrono>
+#include <future>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "core/CoreComponentState.h"
+#include "core/Processor.h"
+#include "core/Resource.h"
+#include "core/logging/Logger.h"
+#include "libwrapper/LibWrapper.h"
+#include "utils/Deleters.h"
+#include "utils/gsl.h"
+#include "utils/OptionalUtils.h"
+#include "WorkerThread.h"
+
+namespace org { namespace apache { namespace nifi { namespace minifi {
namespace extensions { namespace systemd {
+
+enum class PayloadFormat { Raw, Syslog };
+
+class ConsumeJournald final : public core::Processor {
+ public:
+ static constexpr const char* CURSOR_KEY = "cursor";
+ static constexpr const char* PAYLOAD_FORMAT_RAW = "Raw";
+ static constexpr const char* PAYLOAD_FORMAT_SYSLOG = "Syslog";
+ static constexpr const char* JOURNAL_TYPE_USER = "User";
+ static constexpr const char* JOURNAL_TYPE_SYSTEM = "System";
+ static constexpr const char* JOURNAL_TYPE_BOTH = "Both";
+
+ static const core::Relationship Success;
+
+ static const core::Property BatchSize;
+ static const core::Property PayloadFormat;
+ static const core::Property IncludeTimestamp;
+ static const core::Property JournalType;
+ static const core::Property ProcessOldMessages;
+ static const core::Property TimestampFormat;
+
+ explicit ConsumeJournald(const std::string& name, const utils::Identifier&
id = {}, std::unique_ptr<libwrapper::LibWrapper>&& =
libwrapper::createLibWrapper());
+ ConsumeJournald(const ConsumeJournald&) = delete;
+ ConsumeJournald(ConsumeJournald&&) = delete;
+ ConsumeJournald& operator=(const ConsumeJournald&) = delete;
+ ConsumeJournald& operator=(ConsumeJournald&&) = delete;
+ ~ConsumeJournald() final { notifyStop(); }
+
+ void initialize() final;
+ void notifyStop() final;
+ void onSchedule(core::ProcessContext* context, core::ProcessSessionFactory*
sessionFactory) final;
+ void onTrigger(core::ProcessContext* context, core::ProcessSession* session)
final;
+
+ friend struct ConsumeJournaldTestAccessor;
+ private:
+ struct journal_field {
+ std::string name;
+ std::string value;
+ };
+
+ struct journal_message {
+ std::vector<journal_field> fields;
+ std::chrono::system_clock::time_point timestamp;
+ };
+
+ static utils::optional<gsl::span<const char>>
enumerateJournalEntry(libwrapper::Journal&);
+ static utils::optional<journal_field> getNextField(libwrapper::Journal&);
+ std::future<std::pair<std::string, std::vector<journal_message>>>
getCursorAndMessageBatch();
+ std::string formatSyslogMessage(const journal_message&) const;
+ std::string getCursor() const;
+
+ std::atomic<bool> running_{false};
+ std::shared_ptr<core::logging::Logger> logger_ =
core::logging::LoggerFactory<ConsumeJournald>::getLogger();
+ std::shared_ptr<core::CoreComponentStateManager> state_manager_;
+ std::unique_ptr<libwrapper::LibWrapper> libwrapper_;
+ std::unique_ptr<Worker> worker_;
+ std::unique_ptr<libwrapper::Journal> journal_;
+
+ std::size_t batch_size_ = 1000;
+ systemd::PayloadFormat payload_format_ = systemd::PayloadFormat::Syslog;
+ bool include_timestamp_ = true;
+ std::string timestamp_format_ = "%x %X %Z";
+};
+
+REGISTER_RESOURCE(ConsumeJournald, "Consume systemd-journald journal messages.
Creates one flow file per message."
+ "Fields are mapped to attributes. Realtime timestamp is mapped to the
'timestamp' attribute.");
+
+} // namespace systemd
+} // namespace extensions
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/include/utils/gsl.h b/extensions/systemd/WorkerThread.cpp
similarity index 62%
copy from libminifi/include/utils/gsl.h
copy to extensions/systemd/WorkerThread.cpp
index db175b5..4213d29 100644
--- a/libminifi/include/utils/gsl.h
+++ b/extensions/systemd/WorkerThread.cpp
@@ -14,21 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#define LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#include <gsl-lite/gsl-lite.hpp>
+#include "WorkerThread.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org { namespace apache { namespace nifi { namespace minifi {
namespace extensions { namespace systemd {
-namespace gsl = ::gsl_lite;
+namespace detail {
+WorkerThread::WorkerThread()
+ : thread_{&WorkerThread::run, this} {}
+WorkerThread::~WorkerThread() {
+ task_queue_.stop();
+ thread_.join();
+}
+
+void WorkerThread::run() noexcept {
+ while (task_queue_.isRunning()) {
+ task_queue_.consumeWait([](std::packaged_task<void()>&& f) { f(); });
+ }
+}
+} // namespace detail
+
+} // namespace systemd
+} // namespace extensions
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_UTILS_GSL_H_
diff --git a/extensions/systemd/WorkerThread.h
b/extensions/systemd/WorkerThread.h
new file mode 100644
index 0000000..503fcb1
--- /dev/null
+++ b/extensions/systemd/WorkerThread.h
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <future>
+#include <thread>
+#include <utility>
+
+#include "utils/MinifiConcurrentQueue.h"
+
+namespace org { namespace apache { namespace nifi { namespace minifi {
namespace extensions { namespace systemd {
+
+namespace detail {
+class WorkerThread final {
+ public:
+ WorkerThread();
+
+ WorkerThread(const WorkerThread&) = delete;
+ WorkerThread(WorkerThread&&) = delete;
+ WorkerThread& operator=(WorkerThread) = delete;
+
+ ~WorkerThread();
+
+ template<typename... Args>
+ void enqueue(Args&&... args) {
task_queue_.enqueue(std::forward<Args>(args)...); }
+
+ private:
+ void run() noexcept;
+
+ utils::ConditionConcurrentQueue<std::packaged_task<void()>> task_queue_;
+ std::thread thread_;
+};
+} // namespace detail
+
+/**
+ * A worker that executes arbitrary functions with no parameters
asynchronously on an internal thread, returning a future to the result.
+ */
+class Worker final {
+ public:
+ template<typename Func>
+ auto enqueue(Func func) -> std::future<decltype(func())> {
+ using result_type = decltype(func());
+ std::packaged_task<result_type()> task{std::move(func)};
+ auto future = task.get_future();
+ worker_thread_.enqueue(std::move(task));
+ return future;
+ }
+ private:
+ detail::WorkerThread worker_thread_;
+};
+
+} // namespace systemd
+} // namespace extensions
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/systemd/libwrapper/DlopenWrapper.cpp
b/extensions/systemd/libwrapper/DlopenWrapper.cpp
new file mode 100644
index 0000000..2c8f452
--- /dev/null
+++ b/extensions/systemd/libwrapper/DlopenWrapper.cpp
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "DlopenWrapper.h"
+
+#include <dlfcn.h>
+
+#include "Exception.h"
+#include "utils/gsl.h"
+#include "utils/StringUtils.h"
+
+struct sd_journal;
+
+namespace org { namespace apache { namespace nifi { namespace minifi {
namespace extensions { namespace systemd { namespace libwrapper {
+
+namespace {
+struct dlclose_deleter {
+ void operator()(void* const libhandle) { if (libhandle) dlclose(libhandle); }
+};
+} // namespace
+
+class DlopenJournal : public Journal {
+ public:
+ explicit DlopenJournal(const JournalType type) {
+ constexpr auto SD_JOURNAL_LOCAL_ONLY = 1 << 0;
+ constexpr auto SD_JOURNAL_SYSTEM = 1 << 2;
+ constexpr auto SD_JOURNAL_CURRENT_USER = 1 << 3;
+ const int flags{(type == JournalType::User ? SD_JOURNAL_CURRENT_USER : 0)
+ | (type == JournalType::System ? SD_JOURNAL_SYSTEM : 0)
+ | SD_JOURNAL_LOCAL_ONLY};
+ const int error_code = open_(&j_, flags);
+ if (error_code < 0) {
+ // sd_journal_open returns negative errno values
+ const auto err_cond =
std::generic_category().default_error_condition(-error_code);
+ throw SystemErrorException{"sd_journal_open", err_cond};
+ }
+ }
+ DlopenJournal(const DlopenJournal&) = delete;
+ DlopenJournal(DlopenJournal&&) = delete;
+ DlopenJournal& operator=(const DlopenJournal&) = delete;
+ DlopenJournal& operator=(DlopenJournal&&) = delete;
+
+ ~DlopenJournal() override {
+ if (j_ && close_) close_(j_);
+ }
+
+ int seekHead() noexcept override { return seek_head_(j_); }
+ int seekTail() noexcept override { return seek_tail_(j_); }
+ int seekCursor(const char* const cursor) noexcept override { return
seek_cursor_(j_, cursor); }
+
+ int getCursor(gsl::owner<char*>* const cursor_out) noexcept override {
return get_cursor_(j_, cursor_out); }
+
+ int next() noexcept override { return next_(j_); }
+ int enumerateData(const void** const data_out, size_t* const size_out)
noexcept override { return enumerate_data_(j_, data_out, size_out); }
+
+ int getRealtimeUsec(uint64_t* const usec_out) noexcept override { return
get_realtime_usec_(j_, usec_out); }
+
+ private:
+ template<typename F>
+ F loadSymbol(const char* const symbol_name) {
+ // The cast below is supported by POSIX platforms.
https://stackoverflow.com/a/1096349
+ F const symbol = (F)dlsym(libhandle_.get(), symbol_name);
+ const char* const err = dlerror();
+ if (err) throw Exception(ExceptionType::GENERAL_EXCEPTION,
utils::StringUtils::join_pack("dlsym(", symbol_name, "): ", err));
+ return symbol;
+ }
+
+ std::unique_ptr<void, dlclose_deleter> libhandle_{[] {
+ auto* const handle = dlopen("libsystemd.so.0", RTLD_LAZY);
+ if (!handle) throw Exception(ExceptionType::GENERAL_EXCEPTION,
utils::StringUtils::join_pack("dlopen failed: ", dlerror()));
+ return handle;
+ }()};
+
+ int (*open_)(sd_journal**, int) =
loadSymbol<decltype(open_)>("sd_journal_open");
+ void (*close_)(sd_journal*) =
loadSymbol<decltype(close_)>("sd_journal_close");
+ int (*seek_head_)(sd_journal*) =
loadSymbol<decltype(seek_head_)>("sd_journal_seek_head");
+ int (*seek_tail_)(sd_journal*) =
loadSymbol<decltype(seek_tail_)>("sd_journal_seek_tail");
+ int (*seek_cursor_)(sd_journal*, const char*) =
loadSymbol<decltype(seek_cursor_)>("sd_journal_seek_cursor");
+ int (*get_cursor_)(sd_journal*, char**) =
loadSymbol<decltype(get_cursor_)>("sd_journal_get_cursor");
+ int (*next_)(sd_journal*) = loadSymbol<decltype(next_)>("sd_journal_next");
+ int (*enumerate_data_)(sd_journal*, const void**, size_t*) =
loadSymbol<decltype(enumerate_data_)>("sd_journal_enumerate_data");
+ int (*get_realtime_usec_)(sd_journal*, uint64_t*) =
loadSymbol<decltype(get_realtime_usec_)>("sd_journal_get_realtime_usec");
+
+ gsl::owner<sd_journal*> j_ = nullptr;
+};
+
+std::unique_ptr<Journal> DlopenWrapper::openJournal(const JournalType type) {
+ return utils::make_unique<DlopenJournal>(type);
+}
+
+} // namespace libwrapper
+} // namespace systemd
+} // namespace extensions
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/include/utils/gsl.h
b/extensions/systemd/libwrapper/DlopenWrapper.h
similarity index 70%
copy from libminifi/include/utils/gsl.h
copy to extensions/systemd/libwrapper/DlopenWrapper.h
index db175b5..88ae690 100644
--- a/libminifi/include/utils/gsl.h
+++ b/extensions/systemd/libwrapper/DlopenWrapper.h
@@ -14,21 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#define LIBMINIFI_INCLUDE_UTILS_GSL_H_
+#pragma once
-#include <gsl-lite/gsl-lite.hpp>
+#include "LibWrapper.h"
+#include <memory>
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org { namespace apache { namespace nifi { namespace minifi {
namespace extensions { namespace systemd { namespace libwrapper {
-namespace gsl = ::gsl_lite;
+struct DlopenWrapper : LibWrapper {
+ std::unique_ptr<Journal> openJournal(JournalType) override;
+};
+} // namespace libwrapper
+} // namespace systemd
+} // namespace extensions
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_UTILS_GSL_H_
diff --git a/libminifi/include/utils/gsl.h
b/extensions/systemd/libwrapper/LibWrapper.cpp
similarity index 69%
copy from libminifi/include/utils/gsl.h
copy to extensions/systemd/libwrapper/LibWrapper.cpp
index db175b5..d58e393 100644
--- a/libminifi/include/utils/gsl.h
+++ b/extensions/systemd/libwrapper/LibWrapper.cpp
@@ -14,21 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#define LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#include <gsl-lite/gsl-lite.hpp>
+#include "LibWrapper.h"
+#include "DlopenWrapper.h"
+#include "utils/GeneralUtils.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org { namespace apache { namespace nifi { namespace minifi {
namespace extensions { namespace systemd { namespace libwrapper {
-namespace gsl = ::gsl_lite;
+std::unique_ptr<LibWrapper> createLibWrapper() {
+ return utils::make_unique<DlopenWrapper>();
+}
+} // namespace libwrapper
+} // namespace systemd
+} // namespace extensions
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_UTILS_GSL_H_
diff --git a/extensions/systemd/libwrapper/LibWrapper.h
b/extensions/systemd/libwrapper/LibWrapper.h
new file mode 100644
index 0000000..d7e7b7c
--- /dev/null
+++ b/extensions/systemd/libwrapper/LibWrapper.h
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "../Common.h"
+#include "utils/gsl.h"
+
+namespace org { namespace apache { namespace nifi { namespace minifi {
namespace extensions { namespace systemd { namespace libwrapper {
+
+struct Journal {
+ virtual int seekHead() noexcept = 0;
+ virtual int seekTail() noexcept = 0;
+ virtual int seekCursor(const char*) noexcept = 0;
+
+ virtual int getCursor(gsl::owner<char*>* cursor_out) noexcept = 0;
+
+ virtual int next() noexcept = 0;
+ virtual int enumerateData(const void** data_out, size_t* size_out) noexcept
= 0;
+
+ virtual int getRealtimeUsec(uint64_t* usec_out) noexcept = 0;
+
+ virtual ~Journal() = default;
+};
+
+
+struct LibWrapper {
+ virtual std::unique_ptr<Journal> openJournal(JournalType) = 0;
+ virtual ~LibWrapper() = default;
+};
+
+std::unique_ptr<LibWrapper> createLibWrapper();
+
+} // namespace libwrapper
+} // namespace systemd
+} // namespace extensions
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/extensions/systemd/tests/CMakeLists.txt
b/extensions/systemd/tests/CMakeLists.txt
new file mode 100644
index 0000000..5525afa
--- /dev/null
+++ b/extensions/systemd/tests/CMakeLists.txt
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+function(add_systemd_test testfile)
+ get_filename_component(TEST_TARGET "${testfile}" NAME_WE)
+ add_executable(${TEST_TARGET} "${testfile}")
+ target_include_directories(${TEST_TARGET} PRIVATE BEFORE
"${CMAKE_SOURCE_DIR}/extensions/systemd/")
+ target_include_directories(${TEST_TARGET} PRIVATE BEFORE
"${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+ target_include_directories(${TEST_TARGET} PRIVATE BEFORE
"${CMAKE_SOURCE_DIR}/libminifi/test")
+ target_wholearchive_library(${TEST_TARGET} minifi-systemd)
+ target_wholearchive_library(${TEST_TARGET} minifi-standard-processors)
+ createTests("${TEST_TARGET}")
+ add_test(NAME ${TEST_TARGET} COMMAND "${TEST_TARGET}" WORKING_DIRECTORY
"${TEST_DIR}")
+ target_link_libraries(${TEST_TARGET} ${CATCH_MAIN_LIB})
+endfunction()
+
+add_systemd_test("ConsumeJournaldTest.cpp")
diff --git a/extensions/systemd/tests/ConsumeJournaldTest.cpp
b/extensions/systemd/tests/ConsumeJournaldTest.cpp
new file mode 100644
index 0000000..7c13c7d
--- /dev/null
+++ b/extensions/systemd/tests/ConsumeJournaldTest.cpp
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cerrno>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "TestBase.h"
+#include "ConsumeJournald.h"
+#include "libwrapper/LibWrapper.h"
+#include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
+#include "utils/StringUtils.h"
+#include "Utils.h"
+
+namespace minifi = org::apache::nifi::minifi;
+namespace utils = minifi::utils;
+namespace systemd = minifi::extensions::systemd;
+namespace libwrapper = systemd::libwrapper;
+using systemd::JournalType;
+using systemd::ConsumeJournald;
+
+namespace {
+namespace gsl = minifi::gsl;
+struct JournalEntry final {
+ JournalEntry(const char* const identifier, const char* const message, const
int pid = 0, std::vector<std::string> extra_fields = {}, const char* const
hostname = "test-pc")
+ :fields{std::move(extra_fields)}
+ {
+ fields.reserve(fields.size() + 4);
+ fields.push_back(utils::StringUtils::join_pack("MESSAGE=", message));
+ fields.push_back(utils::StringUtils::join_pack("SYSLOG_IDENTIFIER=",
identifier));
+ if (pid != 0) {
+ // The intention of the long expression below is a simple pseudo-random
to test both branches equally
+ // without having to pull in complex random logic
+ const char* const pid_key =
+ (int{message[0]} + int{identifier[0]} +
static_cast<int>(extra_fields.size()) + pid + int{hostname[0]}) % 2 == 0 ?
"_PID" : "SYSLOG_PID";
+ fields.push_back(utils::StringUtils::join_pack(pid_key, "=",
std::to_string(pid)));
+ }
+ fields.push_back(utils::StringUtils::join_pack("_HOSTNAME=", hostname));
+ }
+
+ std::vector<std::string> fields; // in KEY=VALUE format, like systemd
+};
+struct TestJournal final : libwrapper::Journal {
+ explicit TestJournal(std::vector<JournalEntry>& journal)
+ :journal{&journal}
+ { }
+
+ int seekHead() noexcept override {
+ cursor = 0;
+ consumed = -1;
+ field_id = 0;
+ return 0;
+ }
+
+ int seekTail() noexcept override {
+ cursor = journal->size();
+ consumed = gsl::narrow<ssize_t>(journal->size() - 1);
+ field_id = 0;
+ return 0;
+ }
+
+ int seekCursor(const char* const cur) noexcept override {
+ try {
+ cursor = gsl::narrow<size_t>(std::stoll(cur) + 1);
+ consumed = gsl::narrow<ssize_t>(cursor - 1);
+ } catch (const std::invalid_argument&) {
+ return -EINVAL;
+ } catch (const std::out_of_range&) {
+ return -ERANGE;
+ }
+ field_id = 0;
+ return 0;
+ }
+
+ int getCursor(gsl::owner<char*>* const cursor_out) noexcept override {
+ *cursor_out = strdup(std::to_string(consumed).c_str());
+ return *cursor_out ? 0 : -ENOMEM;
+ }
+
+ int next() noexcept override {
+ cursor = gsl::narrow<size_t>(consumed + 1);
+ field_id = 0;
+ if (cursor >= journal->size()) return 0;
+ return 1;
+ }
+
+ int enumerateData(const void** const data_out, size_t* const size_out)
noexcept override {
+ if (cursor >= journal->size()) {
+ cursor = gsl::narrow<size_t>(consumed + 1);
+ return -EADDRNOTAVAIL;
+ }
+ if (field_id >= (*journal)[cursor].fields.size()) return 0;
+ const auto result = gsl::narrow<int>((*journal)[cursor].fields.size() -
field_id);
+ *data_out = (*journal)[cursor].fields[field_id].c_str();
+ *size_out = (*journal)[cursor].fields[field_id].size();
+ consumed = gsl::narrow<ssize_t>(cursor);
+ ++field_id;
+ return result;
+ }
+
+ int getRealtimeUsec(uint64_t* const usec_out) noexcept override {
+ constexpr auto _20210415171703 = 1618507023000000;
+ constexpr auto usec_per_sec = 1000000;
+ *usec_out = _20210415171703 + cursor * usec_per_sec + 123456;
+ return 0;
+ }
+
+ size_t cursor = 0;
+ ssize_t consumed = -1;
+ size_t field_id = 0;
+ gsl::not_null<std::vector<JournalEntry>*> journal;
+};
+struct TestLibWrapper final : libwrapper::LibWrapper {
+ explicit TestLibWrapper(std::vector<JournalEntry> journal)
+ :journal{std::move(journal)}
+ { }
+
+ std::unique_ptr<libwrapper::Journal> openJournal(JournalType) override {
+ return utils::make_unique<TestJournal>(journal);
+ }
+
+ std::vector<JournalEntry> journal;
+};
+} // namespace
+
+namespace org { namespace apache { namespace nifi { namespace minifi {
namespace extensions { namespace systemd {
+struct ConsumeJournaldTestAccessor {
+ FIELD_ACCESSOR(state_manager_);
+};
+}}}}}} // namespace org::apache::nifi::minifi::extensions::systemd
+using
org::apache::nifi::minifi::extensions::systemd::ConsumeJournaldTestAccessor;
+
+TEST_CASE("ConsumeJournald", "[consumejournald]") {
+ TestController test_controller;
+ LogTestController::getInstance().setTrace<ConsumeJournald>();
+ const auto plan = test_controller.createPlan();
+ auto libwrapper = utils::make_unique<TestLibWrapper>(TestLibWrapper{{
+ {"kernel", "Linux version 5.10.12-gentoo-x86_64
([email protected]) (x86_64-pc-linux-gnu-gcc (Gentoo 10.2.0-r5 p6)
10.2.0, GNU ld (Gentoo 2.35.2 p1) 2.35.2) #1 SMP Sat Feb 20 03:13:45 CET
2021"}, // NOLINT
+ {"kernel", "NX (Execute Disable) protection: active"},
+ {"kernel", "ACPI: Local APIC address 0xfee00000"},
+ {"kernel", "HugeTLB registered 1.00 GiB page size, pre-allocated 0
pages"},
+ {"kernel", "SCSI subsystem initialized"},
+ {"systemd", "Starting Rule-based Manager for Device Events and
Files...", 1},
+ }});
+ auto* const libwrapper_observer = libwrapper.get();
+ const auto consume_journald =
plan->addProcessor(std::make_shared<ConsumeJournald>("ConsumeJournald",
utils::Identifier{},
+ std::move(libwrapper)), "ConsumeJournald");
+ REQUIRE(consume_journald->setProperty(ConsumeJournald::TimestampFormat,
"ISO8601"));
+ const auto get_cursor_position = [&consume_journald]() -> std::string {
+ return
ConsumeJournaldTestAccessor::get_state_manager_(dynamic_cast<ConsumeJournald&>(*consume_journald))->get()->at("cursor");
+ };
+
+ SECTION("defaults") {
+ // first run: seeks to the end, no flow files are created. Yields. Can't
check cursor position, because it's only set during message consumption.
+ plan->runNextProcessor();
+ REQUIRE(nullptr == plan->getFlowFileProducedByCurrentProcessor()); //
ConsumeJournald seeks to tail by default, therefore no flow files are produced
+ REQUIRE(consume_journald->isYield());
+
+ // add a flow file, check the content
+ libwrapper_observer->journal.emplace_back("systemd", "Mounted /boot.", 1);
+ plan->runCurrentProcessor();
+ REQUIRE("6" == get_cursor_position());
+ REQUIRE("2021-04-15T17:17:09.123456000+00:00 test-pc systemd[1]: Mounted
/boot." == plan->getContent(plan->getCurrentFlowFile()));
+
+ // add two new messages, expect two new flow files
+ libwrapper_observer->journal.emplace_back("dbus-daemon", "[system]
Successfully activated service 'org.freedesktop.UPower'", 2200);
+ libwrapper_observer->journal.emplace_back("NetworkManager", "<info>
[1618507047.7278] manager: (virbr0): new Bridge device
(/org/freedesktop/NetworkManager/Devices/5)", 2201);
+ plan->runCurrentProcessor();
+ REQUIRE(2 == plan->getNumFlowFileProducedByCurrentProcessor());
+ REQUIRE("8" == get_cursor_position());
+ const auto content = plan->getContent(plan->getCurrentFlowFile());
+ REQUIRE(!content.empty());
+ }
+ SECTION("Raw format, one-by-one") {
+ REQUIRE(consume_journald->setProperty(ConsumeJournald::BatchSize, "1"));
+ REQUIRE(consume_journald->setProperty(ConsumeJournald::PayloadFormat,
"Raw"));
+ REQUIRE(consume_journald->setProperty(ConsumeJournald::ProcessOldMessages,
"true"));
+ {
+ plan->runNextProcessor();
+ REQUIRE("0" == get_cursor_position());
+ REQUIRE(!consume_journald->isYield());
+ const auto flowfile = plan->getCurrentFlowFile();
+ const auto content = plan->getContent(flowfile);
+ REQUIRE("Linux version 5.10.12-gentoo-x86_64 ([email protected])
(x86_64-pc-linux-gnu-gcc (Gentoo 10.2.0-r5 p6) 10.2.0, GNU ld (Gentoo 2.35.2
p1) 2.35.2) #1 SMP Sat Feb 20 03:13:45 CET 2021" // NOLINT
+ == content);
+ REQUIRE("2021-04-15T17:17:03.123456000+00:00" ==
flowfile->getAttribute("timestamp").value_or("n/a"));
+ }
+ {
+ plan->runCurrentProcessor();
+ REQUIRE("1" == get_cursor_position());
+ const auto content = plan->getContent(plan->getCurrentFlowFile());
+ REQUIRE("NX (Execute Disable) protection: active" == content);
+ REQUIRE("2021-04-15T17:17:04.123456000+00:00" ==
plan->getCurrentFlowFile()->getAttribute("timestamp").value_or("n/a"));
+ }
+
+ plan->runCurrentProcessor();
+ REQUIRE("2" == get_cursor_position());
+ REQUIRE("ACPI: Local APIC address 0xfee00000" ==
plan->getContent(plan->getCurrentFlowFile()));
+
+ plan->runCurrentProcessor();
+ REQUIRE("HugeTLB registered 1.00 GiB page size, pre-allocated 0 pages" ==
plan->getContent(plan->getCurrentFlowFile()));
+
+ plan->runCurrentProcessor();
+ REQUIRE("SCSI subsystem initialized" ==
plan->getContent(plan->getCurrentFlowFile()));
+
+ plan->runCurrentProcessor();
+ REQUIRE("Starting Rule-based Manager for Device Events and Files..." ==
plan->getContent(plan->getCurrentFlowFile()));
+ REQUIRE("5" == get_cursor_position());
+
+ plan->runCurrentProcessor();
+ REQUIRE(nullptr == plan->getCurrentFlowFile());
+ REQUIRE("5" == get_cursor_position());
+
+ {
+ // add a flow file, check the content and the timestamp
+ libwrapper_observer->journal.emplace_back("systemd", "Mounted /boot.",
1);
+ plan->runCurrentProcessor();
+ const auto flowfile = plan->getCurrentFlowFile();
+ const auto content = plan->getContent(flowfile);
+ REQUIRE("6" == get_cursor_position());
+ REQUIRE("2021-04-15T17:17:09.123456000+00:00" ==
flowfile->getAttribute("timestamp"));
+ REQUIRE("Mounted /boot." == content);
+ REQUIRE("test-pc" ==
flowfile->getAttribute("_HOSTNAME").value_or("n/a"));
+ REQUIRE("systemd" ==
flowfile->getAttribute("SYSLOG_IDENTIFIER").value_or("n/a"));
+ const auto pid = (flowfile->getAttribute("_PID") | utils::orElse([&] {
return flowfile->getAttribute("SYSLOG_PID"); })).value_or("n/a");
+ REQUIRE("1" == pid);
+ }
+
+ plan->runCurrentProcessor();
+ REQUIRE(nullptr == plan->getCurrentFlowFile());
+ REQUIRE("6" == get_cursor_position());
+ }
+ SECTION("Include Timestamp is honored") {
+ REQUIRE(consume_journald->setProperty(ConsumeJournald::BatchSize, "1"));
+ REQUIRE(consume_journald->setProperty(ConsumeJournald::IncludeTimestamp,
"false"));
+ plan->runNextProcessor(); // first run: seeks to the end, no flow files
are created. Yields.
+ REQUIRE(nullptr == plan->getCurrentFlowFile());
+
+ // add a flow file, ensure that no timestamp is added to the attributes
+ libwrapper_observer->journal.emplace_back("systemd", "Mounted /boot.", 1);
+ plan->runCurrentProcessor();
+ REQUIRE("2021-04-15T17:17:09.123456000+00:00 test-pc systemd[1]: Mounted
/boot." == plan->getContent(plan->getCurrentFlowFile()));
+
REQUIRE(!plan->getCurrentFlowFile()->getAttribute("timestamp").has_value());
+ }
+ SECTION("Batch Size is honored") {
+ REQUIRE(consume_journald->setProperty(ConsumeJournald::BatchSize, "3"));
+ REQUIRE(consume_journald->setProperty(ConsumeJournald::ProcessOldMessages,
"true"));
+ libwrapper_observer->journal.emplace_back("systemd", "Mounted /boot.", 1);
+ libwrapper_observer->journal.emplace_back("dbus-daemon", "[system]
Successfully activated service 'org.freedesktop.UPower'", 2200);
+
+ plan->runNextProcessor();
+ REQUIRE(3 == plan->getNumFlowFileProducedByCurrentProcessor());
+
+ plan->runCurrentProcessor();
+ REQUIRE(6 == plan->getNumFlowFileProducedByCurrentProcessor());
+
+ plan->runCurrentProcessor();
+ REQUIRE(8 == plan->getNumFlowFileProducedByCurrentProcessor());
+ REQUIRE(!consume_journald->isYield());
+
+ plan->runCurrentProcessor();
+ REQUIRE(8 == plan->getNumFlowFileProducedByCurrentProcessor());
+ REQUIRE(consume_journald->isYield());
+ }
+ SECTION("throw on invalid batch size") {
+ REQUIRE_THROWS(consume_journald->setProperty(ConsumeJournald::BatchSize,
"asdf"));
+ }
+ SECTION("throw on invalid payload format") {
+ REQUIRE(consume_journald->setProperty(ConsumeJournald::PayloadFormat,
"raw")); // case-sensitive
+ REQUIRE_THROWS(plan->scheduleProcessor(consume_journald));
+ }
+ SECTION("throw on invalid journal type") {
+ REQUIRE(consume_journald->setProperty(ConsumeJournald::JournalType,
"hello"));
+ REQUIRE_THROWS(plan->scheduleProcessor(consume_journald));
+ }
+ SECTION("throw on invalid journal type") {
+ REQUIRE(consume_journald->setProperty(ConsumeJournald::JournalType,
"hello"));
+ REQUIRE_THROWS(plan->scheduleProcessor(consume_journald));
+ }
+}
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 613d26a..f0c0ded 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -17,13 +17,10 @@
# under the License.
#
-cmake_minimum_required (VERSION 2.6)
+cmake_minimum_required (VERSION 3.11)
-project(nifi-libcore-minifi)
+project(nifi-libcore-minifi VERSION 0.9.0)
set(PROJECT_NAME "nifi-libcore-minifi")
-set(PROJECT_VERSION_MAJOR 0)
-set(PROJECT_VERSION_MINOR 9)
-set(PROJECT_VERSION_PATCH 0)
#### Establish Project Configuration ####
@@ -127,7 +124,7 @@ if(NOT WIN32)
list(APPEND LIBMINIFI_LIBRARIES OSSP::libuuid++)
endif()
if (NOT OPENSSL_OFF)
- list(APPEND LIBMINIFI_LIBRARIES OpenSSL::SSL)
+ list(APPEND LIBMINIFI_LIBRARIES OpenSSL::Crypto OpenSSL::SSL)
endif()
target_link_libraries(core-minifi ${CMAKE_DL_LIBS} ${LIBMINIFI_LIBRARIES})
diff --git a/libminifi/include/Exception.h b/libminifi/include/Exception.h
index ef85076..634060d 100644
--- a/libminifi/include/Exception.h
+++ b/libminifi/include/Exception.h
@@ -27,6 +27,7 @@
#include <sstream>
#include <stdexcept>
#include <string>
+#include <system_error>
#include "utils/StringUtils.h"
@@ -55,7 +56,7 @@ inline const char *ExceptionTypeToString(ExceptionType type) {
if (type < MAX_EXCEPTION)
return ExceptionStr[type];
else
- return NULL;
+ return nullptr;
}
struct Exception : public std::runtime_error {
@@ -63,12 +64,32 @@ struct Exception : public std::runtime_error {
* Create a new exception
*/
Exception(ExceptionType type, const std::string& errorMsg)
- : std::runtime_error{
org::apache::nifi::minifi::utils::StringUtils::join_pack(ExceptionTypeToString(type),
": ", errorMsg) }
+ :Exception{ utils::StringUtils::join_pack(ExceptionTypeToString(type),
": ", errorMsg) }
{ }
Exception(ExceptionType type, const char* errorMsg)
- : std::runtime_error{
org::apache::nifi::minifi::utils::StringUtils::join_pack(ExceptionTypeToString(type),
": ", errorMsg) }
+ :Exception{ utils::StringUtils::join_pack(ExceptionTypeToString(type),
": ", errorMsg) }
{ }
+
+ protected:
+ explicit Exception(const std::string& errmsg)
+ :std::runtime_error{ errmsg }
+ {}
+ explicit Exception(const char* errmsg)
+ :std::runtime_error{ errmsg }
+ {}
+};
+
+struct SystemErrorException : Exception {
+ explicit SystemErrorException(const char* const operation,
std::error_condition error_condition)
+ :Exception{ utils::StringUtils::join_pack(operation, ": ",
error_condition.message()) },
+ error_condition_{ error_condition }
+ {}
+
+ std::error_condition error_condition() { return error_condition_; }
+
+ private:
+ std::error_condition error_condition_;
};
} // namespace minifi
diff --git a/libminifi/include/core/ConfigurableComponent.h
b/libminifi/include/core/ConfigurableComponent.h
index 0cd8723..86d588a 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -88,7 +88,7 @@ class ConfigurableComponent {
* @param value property value.
* @return whether property was set or not
*/
- bool setProperty(Property &prop, std::string value);
+ bool setProperty(const Property& prop, std::string value);
/**
* Sets the property using the provided name
@@ -96,7 +96,7 @@ class ConfigurableComponent {
* @param value property value.
* @return whether property was set or not
*/
- bool setProperty(Property &prop, PropertyValue &value);
+ bool setProperty(const Property& prop, PropertyValue &value);
/**
* Sets supported properties for the ConfigurableComponent
diff --git a/libminifi/include/core/CoreComponentState.h
b/libminifi/include/core/CoreComponentState.h
index 3174b40..2bdc04a 100644
--- a/libminifi/include/core/CoreComponentState.h
+++ b/libminifi/include/core/CoreComponentState.h
@@ -26,6 +26,8 @@
#include <map>
#include <string>
+#include "utils/OptionalUtils.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -42,6 +44,15 @@ class CoreComponentStateManager {
virtual bool get(CoreComponentState& kvs) = 0;
+ utils::optional<std::unordered_map<std::string, std::string>> get() {
+ std::unordered_map<std::string, std::string> out;
+ if (get(out)) {
+ return out;
+ } else {
+ return utils::nullopt;
+ }
+ }
+
virtual bool clear() = 0;
virtual bool persist() = 0;
diff --git a/libminifi/include/core/FlowFile.h
b/libminifi/include/core/FlowFile.h
index 7b97fc7..dcca83d 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -158,8 +158,11 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
/**
* setAttribute, if attribute already there, update it, else, add it
*/
- bool setAttribute(const std::string& key, const std::string& value) {
- return attributes_.insert_or_assign(key, value).second;
+ bool setAttribute(const std::string& key, std::string value) {
+ return attributes_.insert_or_assign(key, std::move(value)).second;
+ }
+ bool setAttribute(std::string&& key, std::string value) {
+ return attributes_.insert_or_assign(std::move(key),
std::move(value)).second;
}
/**
diff --git a/libminifi/include/core/ProcessContext.h
b/libminifi/include/core/ProcessContext.h
index b57cc52..f76cf7e 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -40,6 +40,8 @@
#include "core/FlowFile.h"
#include "core/CoreComponentState.h"
#include "utils/file/FileUtils.h"
+#include "utils/OptionalUtils.h"
+#include "utils/PropertyErrors.h"
#include "VariableRegistry.h"
namespace org {
@@ -97,6 +99,18 @@ class ProcessContext : public
controller::ControllerServiceLookup, public core::
return processor_node_;
}
+ template<typename T = std::string>
+ typename std::enable_if<std::is_default_constructible<T>::value,
utils::optional<T>>::type
+ getProperty(const Property& property) {
+ T value;
+ try {
+ if (!getProperty(property.getName(), value)) return utils::nullopt;
+ } catch (const utils::internal::ValueException&) {
+ return utils::nullopt;
+ }
+ return value;
+ }
+
template<typename T>
bool getProperty(const std::string &name, T &value) const {
return getPropertyImp<typename std::common_type<T>::type>(name, value);
@@ -122,7 +136,7 @@ class ProcessContext : public
controller::ControllerServiceLookup, public core::
return processor_node_->setDynamicProperty(name, value);
}
// Sets the property value using the Property object
- bool setProperty(Property prop, std::string value) {
+ bool setProperty(const Property& prop, std::string value) {
return processor_node_->setProperty(prop, value);
}
// Whether the relationship is supported
diff --git a/libminifi/include/core/ProcessSession.h
b/libminifi/include/core/ProcessSession.h
index 743adf8..302c8b1 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -37,6 +37,7 @@
#include "FlowFile.h"
#include "WeakReference.h"
#include "provenance/Provenance.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -96,6 +97,8 @@ class ProcessSession : public ReferenceContainer {
int read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback
*callback);
// Execute the given write callback against the content
void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback
*callback);
+ // Replace content with buffer
+ void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file,
gsl::span<const char> buffer);
// Execute the given write/append callback against the content
void append(const std::shared_ptr<core::FlowFile> &flow,
OutputStreamCallback *callback);
// Penalize the flow
diff --git a/libminifi/include/core/ProcessorNode.h
b/libminifi/include/core/ProcessorNode.h
index 6a35232..557cc23 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -155,7 +155,7 @@ class ProcessorNode : public ConfigurableComponent, public
Connectable {
* @param value property value.
* @return whether property was set or not
*/
- bool setProperty(Property &prop, std::string value) {
+ bool setProperty(const Property &prop, std::string value) {
const std::shared_ptr<ConfigurableComponent> processor_cast =
std::dynamic_pointer_cast<ConfigurableComponent>(processor_);
bool ret = ConfigurableComponent::setProperty(prop, value);
if (nullptr != processor_cast)
diff --git a/libminifi/include/core/TypedValues.h
b/libminifi/include/core/TypedValues.h
index f735d37..b3808ba 100644
--- a/libminifi/include/core/TypedValues.h
+++ b/libminifi/include/core/TypedValues.h
@@ -51,7 +51,7 @@ class TimePeriodValue : public TransformableValue, public
state::response::UInt6
explicit TimePeriodValue(const std::string &timeString)
: state::response::UInt64Value(0) {
- TimeUnit units;
+ TimeUnit units{};
if (!StringToTime(timeString, value, units)) {
throw utils::internal::ParseException("Couldn't parse TimePeriodValue");
}
diff --git a/libminifi/include/utils/FlatMap.h
b/libminifi/include/utils/FlatMap.h
index 096f904..eb278af 100644
--- a/libminifi/include/utils/FlatMap.h
+++ b/libminifi/include/utils/FlatMap.h
@@ -185,6 +185,17 @@ class FlatMap{
return {iterator{data_.begin() + data_.size() - 1}, true};
}
+ template<typename M>
+ std::pair<iterator, bool> insert_or_assign(K&& key, M&& value) {
+ auto it = find(key);
+ if (it != end()) {
+ it->second = std::forward<M>(value);
+ return {it, false};
+ }
+ data_.emplace_back(std::move(key), std::forward<M>(value));
+ return {iterator{data_.begin() + data_.size() - 1}, true};
+ }
+
iterator find(const K& key) {
for (auto it = data_.begin(); it != data_.end(); ++it) {
if (it->first == key) return iterator{it};
diff --git a/libminifi/include/utils/GeneralUtils.h
b/libminifi/include/utils/GeneralUtils.h
index 4b11c67..bd70412 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -174,6 +174,15 @@ auto invoke(F&& f, Args&&... args)
MINIFICPP_UTIL_DEDUCED(detail::invoke_impl(st
using std::invoke
#endif /* < C++17 */
+namespace detail {
+struct dereference_t {
+ template<typename T>
+ T &operator()(T *ptr) const noexcept { return *ptr; }
+};
+} // namespace detail
+
+constexpr detail::dereference_t dereference{};
+
} // namespace utils
} // namespace minifi
} // namespace nifi
diff --git a/libminifi/include/utils/OptionalUtils.h
b/libminifi/include/utils/OptionalUtils.h
index 905128c..8eec781 100644
--- a/libminifi/include/utils/OptionalUtils.h
+++ b/libminifi/include/utils/OptionalUtils.h
@@ -36,7 +36,8 @@ using nonstd::make_optional;
template<typename T>
optional<typename gsl_lite::remove_cvref<T>::type> optional_from_ptr(T&& obj) {
- return obj == nullptr ? nullopt : optional<typename
gsl_lite::remove_cvref<T>::type>{ std::forward<T>(obj) };
+ // detail::remove_cvref_t comes from gsl.h
+ return obj == nullptr ? nullopt : optional<detail::remove_cvref_t<T>>{
std::forward<T>(obj) };
}
template<typename>
@@ -62,6 +63,16 @@ auto operator|(const optional<SourceType>& o, map_wrapper<F>
f) noexcept(noexcep
}
}
+template<typename SourceType, typename F>
+auto operator|(optional<SourceType>&& o, map_wrapper<F> f)
noexcept(noexcept(utils::invoke(std::forward<F>(f.function), std::move(*o))))
+ -> optional<typename
std::decay<decltype(utils::invoke(std::forward<F>(f.function),
std::move(*o)))>::type> {
+ if (o.has_value()) {
+ return make_optional(utils::invoke(std::forward<F>(f.function),
std::move(*o)));
+ } else {
+ return nullopt;
+ }
+}
+
template<typename T>
struct flat_map_wrapper {
T function;
@@ -78,7 +89,43 @@ auto operator|(const optional<SourceType>& o,
flat_map_wrapper<F> f) noexcept(no
return nullopt;
}
}
+template<typename SourceType, typename F>
+auto operator|(optional<SourceType>&& o, flat_map_wrapper<F> f)
noexcept(noexcept(utils::invoke(std::forward<F>(f.function), std::move(*o))))
+ -> optional<typename
std::decay<decltype(*utils::invoke(std::forward<F>(f.function),
std::move(*o)))>::type> {
+
static_assert(is_optional<decltype(utils::invoke(std::forward<F>(f.function),
std::move(*o)))>::value, "flatMap expects a function returning optional");
+ if (o.has_value()) {
+ return utils::invoke(std::forward<F>(f.function), std::move(*o));
+ } else {
+ return nullopt;
+ }
+}
+template<typename T>
+struct or_else_wrapper {
+ T function;
+};
+
+// orElse implementation
+template<typename SourceType, typename F>
+auto operator|(optional<SourceType> o, or_else_wrapper<F> f)
noexcept(noexcept(utils::invoke(std::forward<F>(f.function))))
+ -> typename
std::enable_if<std::is_same<decltype(utils::invoke(std::forward<F>(f.function))),
void>::value, optional<SourceType>>::type {
+ if (o.has_value()) {
+ return o;
+ } else {
+ utils::invoke(std::forward<F>(f.function));
+ return nullopt;
+ }
+}
+
+template<typename SourceType, typename F>
+auto operator|(optional<SourceType> o, or_else_wrapper<F> f)
noexcept(noexcept(utils::invoke(std::forward<F>(f.function))))
+ -> typename std::enable_if<std::is_same<typename
std::decay<decltype(utils::invoke(std::forward<F>(f.function)))>::type,
optional<SourceType>>::value, optional<SourceType>>::type {
+ if (o.has_value()) {
+ return o;
+ } else {
+ return utils::invoke(std::forward<F>(f.function));
+ }
+}
} // namespace detail
template<typename T>
@@ -87,6 +134,9 @@ detail::map_wrapper<T&&> map(T&& func) noexcept { return
{std::forward<T>(func)}
template<typename T>
detail::flat_map_wrapper<T&&> flatMap(T&& func) noexcept { return
{std::forward<T>(func)}; }
+template<typename T>
+detail::or_else_wrapper<T&&> orElse(T&& func) noexcept { return
{std::forward<T>(func)}; }
+
} // namespace utils
} // namespace minifi
} // namespace nifi
diff --git a/libminifi/include/utils/gsl.h b/libminifi/include/utils/gsl.h
index db175b5..9bd4f09 100644
--- a/libminifi/include/utils/gsl.h
+++ b/libminifi/include/utils/gsl.h
@@ -17,6 +17,8 @@
#ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
#define LIBMINIFI_INCLUDE_UTILS_GSL_H_
+#include <type_traits>
+
#include <gsl-lite/gsl-lite.hpp>
namespace org {
@@ -26,6 +28,26 @@ namespace minifi {
namespace gsl = ::gsl_lite;
+namespace utils {
+namespace detail {
+template<typename T>
+using remove_cvref_t = typename std::remove_cv<typename
std::remove_reference<T>::type>::type;
+} // namespace detail
+
+template<typename Container, typename T>
+Container span_to(gsl::span<T> span) {
+ static_assert(std::is_constructible<Container, typename
gsl::span<T>::iterator, typename gsl::span<T>::iterator>::value,
+ "The destination container must have an iterator (pointer) range
constructor");
+ return Container(std::begin(span), std::end(span));
+}
+template<template<typename...> class Container, typename T>
+Container<detail::remove_cvref_t<T>> span_to(gsl::span<T> span) {
+ static_assert(std::is_constructible<Container<detail::remove_cvref_t<T>>,
typename gsl::span<T>::iterator, typename gsl::span<T>::iterator>::value,
+ "The destination container must have an iterator (pointer) range
constructor");
+ return span_to<Container<detail::remove_cvref_t<T>>>(span);
+}
+} // namespace utils
+
} // namespace minifi
} // namespace nifi
} // namespace apache
diff --git a/libminifi/src/core/ConfigurableComponent.cpp
b/libminifi/src/core/ConfigurableComponent.cpp
index f97b78f..dc3f877 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -115,7 +115,7 @@ bool ConfigurableComponent::updateProperty(const
std::string &name, const std::s
* @param value property value.
* @return whether property was set or not
*/
-bool ConfigurableComponent::setProperty(Property &prop, std::string value) {
+bool ConfigurableComponent::setProperty(const Property& prop, std::string
value) {
std::lock_guard<std::mutex> lock(configuration_mutex_);
auto it = properties_.find(prop.getName());
@@ -144,7 +144,7 @@ bool ConfigurableComponent::setProperty(Property &prop,
std::string value) {
}
}
-bool ConfigurableComponent::setProperty(Property &prop, PropertyValue &value) {
+bool ConfigurableComponent::setProperty(const Property& prop, PropertyValue
&value) {
std::lock_guard<std::mutex> lock(configuration_mutex_);
auto it = properties_.find(prop.getName());
diff --git a/libminifi/src/core/ProcessSession.cpp
b/libminifi/src/core/ProcessSession.cpp
index f8e8c31..e3ce368 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -241,6 +241,18 @@ void ProcessSession::write(const
std::shared_ptr<core::FlowFile> &flow, OutputSt
}
}
+void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>&
flow_file, gsl::span<const char> buffer) {
+ struct BufferOutputStreamCallback : OutputStreamCallback {
+ explicit BufferOutputStreamCallback(gsl::span<const char> buffer)
:buffer{buffer} {}
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) final {
+ return stream->write(reinterpret_cast<const uint8_t*>(buffer.data()),
buffer.size());
+ }
+ gsl::span<const char> buffer;
+ };
+ BufferOutputStreamCallback cb{ buffer };
+ write(flow_file, &cb);
+}
+
void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow,
OutputStreamCallback *callback) {
std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
if (!claim) {
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 5a60145..4e06ebe 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -407,7 +407,7 @@ class TestController {
return std::make_shared<TestPlan>(content_repo, flow_repo, repo,
flow_version_, configuration, state_dir);
}
- void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion =
true, std::function<void(const std::shared_ptr<core::ProcessContext>&, const
std::shared_ptr<core::ProcessSession>&)> verify =
+ void runSession(const std::shared_ptr<TestPlan> &plan, bool runToCompletion
= true, const std::function<void(const std::shared_ptr<core::ProcessContext>&,
const std::shared_ptr<core::ProcessSession>&)>& verify =
nullptr) {
while (plan->runNextProcessor(verify) && runToCompletion) {
}
diff --git a/libminifi/test/unit/GeneralUtilsTest.cpp
b/libminifi/test/unit/GeneralUtilsTest.cpp
index e9b0bfb..1d01f33 100644
--- a/libminifi/test/unit/GeneralUtilsTest.cpp
+++ b/libminifi/test/unit/GeneralUtilsTest.cpp
@@ -134,3 +134,10 @@ TEST_CASE("GeneralUtils::invoke FunctionObject", "[invoke
function object]") {
// invoking lambda
REQUIRE(60 == utils::invoke(int_timesn, 20));
}
+
+TEST_CASE("GeneralUtils::dereference", "[dereference]") {
+ const int a = 42;
+ const auto* const pa = &a;
+ REQUIRE(42 == utils::dereference(pa));
+ REQUIRE(&a == &utils::dereference(pa));
+}
diff --git a/libminifi/include/utils/gsl.h b/libminifi/test/unit/GslTest.cpp
similarity index 64%
copy from libminifi/include/utils/gsl.h
copy to libminifi/test/unit/GslTest.cpp
index db175b5..f94145a 100644
--- a/libminifi/include/utils/gsl.h
+++ b/libminifi/test/unit/GslTest.cpp
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,21 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_UTILS_GSL_H_
-#define LIBMINIFI_INCLUDE_UTILS_GSL_H_
-
-#include <gsl-lite/gsl-lite.hpp>
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+#include <vector>
+#include <string>
+#include "../TestBase.h"
+#include "utils/gsl.h"
-namespace gsl = ::gsl_lite;
+namespace utils = org::apache::nifi::minifi::utils;
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+TEST_CASE("span to", "[span to]") {
+ const auto test_span = gsl::make_span("test text", 9);
+ const auto string = utils::span_to<std::string>(test_span);
+ const auto vector = utils::span_to<std::vector>(test_span);
-#endif // LIBMINIFI_INCLUDE_UTILS_GSL_H_
+ REQUIRE(string == "test text");
+ REQUIRE('t' == vector[0]);
+ REQUIRE(9 == vector.size());
+}
diff --git a/libminifi/test/unit/OptionalTest.cpp
b/libminifi/test/unit/OptionalTest.cpp
index 311afec..3be3f2a 100644
--- a/libminifi/test/unit/OptionalTest.cpp
+++ b/libminifi/test/unit/OptionalTest.cpp
@@ -45,3 +45,18 @@ TEST_CASE("optional flatMap", "[optional flat map]") {
const auto test3 = utils::make_optional(7) |
utils::flatMap(mutable_lval_func);
REQUIRE(!test3.has_value());
}
+
+TEST_CASE("optional orElse", "[optional or else]") {
+ const auto opt_7 = [] { return utils::make_optional(7); };
+ const auto test1 = utils::make_optional(6) | utils::orElse(opt_7);
+ const auto test2 = utils::optional<int>{} | utils::orElse(opt_7);
+ const auto test3 = utils::make_optional(3) | utils::orElse([]{});
+ const auto test4 = utils::optional<int>{} | utils::orElse([]{});
+ struct ex : std::exception {};
+
+ REQUIRE(6 == test1.value());
+ REQUIRE(7 == test2.value());
+ REQUIRE(3 == test3.value());
+ REQUIRE(!test4);
+ REQUIRE_THROWS_AS(utils::optional<bool>{} | utils::orElse([]{ throw ex{};
}), const ex&);
+}
diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt
index b81e08f..8dd958b 100644
--- a/main/CMakeLists.txt
+++ b/main/CMakeLists.txt
@@ -17,11 +17,7 @@
# under the License.
#
-cmake_minimum_required(VERSION 2.6)
-
-IF(POLICY CMP0048)
- CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
+cmake_minimum_required(VERSION 3.11)
include_directories(../libminifi/include)
@@ -95,6 +91,7 @@ foreach(EXTENSION ${extensions})
endforeach()
set_target_properties(minifiexe PROPERTIES OUTPUT_NAME minifi)
+set_target_properties(minifiexe PROPERTIES ENABLE_EXPORTS True)
if (NOT WIN32)
add_custom_command(TARGET minifiexe POST_BUILD
diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt
index a941676..d4391e9 100644
--- a/nanofi/CMakeLists.txt
+++ b/nanofi/CMakeLists.txt
@@ -17,11 +17,7 @@
# under the License.
#
-cmake_minimum_required(VERSION 2.6)
-
-IF(POLICY CMP0048)
- CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
+cmake_minimum_required(VERSION 3.11)
include_directories(include)
include_directories(../libminifi/include)
diff --git a/nanofi/ecu/CMakeLists.txt b/nanofi/ecu/CMakeLists.txt
index 9b55cbd..5985666 100644
--- a/nanofi/ecu/CMakeLists.txt
+++ b/nanofi/ecu/CMakeLists.txt
@@ -17,7 +17,7 @@
# under the License.
#
-cmake_minimum_required(VERSION 2.6)
+cmake_minimum_required(VERSION 3.11)
if (NOT WIN32)
diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt
index f35381a..bc33da7 100644
--- a/nanofi/examples/CMakeLists.txt
+++ b/nanofi/examples/CMakeLists.txt
@@ -17,11 +17,7 @@
# under the License.
#
-cmake_minimum_required(VERSION 2.6)
-
-IF(POLICY CMP0048)
- CMAKE_POLICY(SET CMP0048 OLD)
-ENDIF(POLICY CMP0048)
+cmake_minimum_required(VERSION 3.11)
include_directories(/include)