Repository: nifi-minifi-cpp Updated Branches: refs/heads/master bca0a0661 -> 44704b363
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/unit/TimeUtilsTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/TimeUtilsTest.cpp b/libminifi/test/unit/TimeUtilsTest.cpp new file mode 100644 index 0000000..9470338 --- /dev/null +++ b/libminifi/test/unit/TimeUtilsTest.cpp @@ -0,0 +1,21 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/main/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt index 126411b..f7bd6e3 100644 --- a/main/CMakeLists.txt +++ b/main/CMakeLists.txt @@ -23,7 +23,7 @@ IF(POLICY CMP0048) CMAKE_POLICY(SET CMP0048 OLD) ENDIF(POLICY CMP0048) -include_directories(../include ../libminifi/include ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/leveldb-1.18/include ../thirdparty/) +include_directories(../include ../libminifi/include ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/leveldb-1.18/include ../thirdparty/) find_package(Boost REQUIRED) include_directories(${Boost_INCLUDE_DIRS}) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/main/MiNiFiMain.cpp ---------------------------------------------------------------------- diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 3e5202d..6bfd9c9 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -28,12 +28,16 @@ #include <yaml-cpp/yaml.h> #include <iostream> -#include "../libminifi/include/BaseLogger.h" -#include "../libminifi/include/LogAppenders.h" -#include "spdlog/spdlog.h" +#include "core/core.h" -#include "Logger.h" -#include "Configure.h" +#include "core/logging/BaseLogger.h" +#include "core/logging/LogAppenders.h" +#include "spdlog/spdlog.h" +#include "core/FlowConfiguration.h" +#include "core/ConfigurationFactory.h" +#include "core/RepositoryFactory.h" +#include "core/logging/Logger.h" +#include "properties/Configure.h" #include "FlowController.h" //! Main thread sleep interval 1 second @@ -56,7 +60,7 @@ // Variables that allow us to avoid a timed wait. sem_t *running; //! Flow Controller -static FlowController *controller = NULL; +static std::unique_ptr<minifi::FlowController> controller = nullptr; /** * Removed the stop command from the signal handler so that we could trigger @@ -69,104 +73,131 @@ static FlowController *controller = NULL; */ void sigHandler(int signal) { - if (signal == SIGINT || signal == SIGTERM) { - // avoid stopping the controller here. - sem_post(running); - } + if (signal == SIGINT || signal == SIGTERM) { + // avoid stopping the controller here. + sem_post(running); + } } int main(int argc, char **argv) { - std::shared_ptr<Logger> logger = Logger::getLogger(); - - logger->setLogLevel(info); - - uint16_t stop_wait_time = STOP_WAIT_TIME_MS; - - std::string graceful_shutdown_seconds = ""; - - running = sem_open("MiNiFiMain", O_CREAT, 0644, 0); - if (running == SEM_FAILED || running == 0) { - - logger->log_error("could not initialize semaphore"); - perror("initialization failure"); - } - // assumes POSIX compliant environment - std::string minifiHome; - if (const char* env_p = std::getenv(MINIFI_HOME_ENV_KEY)) { - minifiHome = env_p; - } else { - logger->log_info( - "MINIFI_HOME was not found, determining based on executable path."); - char *path = NULL; - char full_path[PATH_MAX]; - path = realpath(argv[0], full_path); - std::string minifiHomePath(path); - minifiHomePath = minifiHomePath.substr(0, - minifiHomePath.find_last_of("/\\")); //Remove /minifi from path - minifiHome = minifiHomePath.substr(0, - minifiHomePath.find_last_of("/\\")); //Remove /bin from path - } - - if (signal(SIGINT, sigHandler) == SIG_ERR - || signal(SIGTERM, sigHandler) == SIG_ERR - || signal(SIGPIPE, SIG_IGN) == SIG_ERR) { - logger->log_error("Can not install signal handler"); - return -1; - } - - Configure *configure = Configure::getConfigure(); - configure->setHome(minifiHome); - configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE); - - if (configure->get(Configure::nifi_graceful_shutdown_seconds, - graceful_shutdown_seconds)) { - try { - stop_wait_time = std::stoi(graceful_shutdown_seconds); - } catch (const std::out_of_range &e) { - logger->log_error("%s is out of range. %s", - Configure::nifi_graceful_shutdown_seconds, e.what()); - } catch (const std::invalid_argument &e) { - logger->log_error("%s contains an invalid argument set. %s", - Configure::nifi_graceful_shutdown_seconds, e.what()); - } - } else { - logger->log_debug("%s not set, defaulting to %d", - Configure::nifi_graceful_shutdown_seconds, STOP_WAIT_TIME_MS); - } - - // set the log configuration. - std::unique_ptr<BaseLogger> configured_logger = LogInstance::getConfiguredLogger( - configure); - - logger->updateLogger(std::move(configured_logger)); - - controller = FlowControllerFactory::getFlowController(); - - // Load flow from specified configuration file - controller->load(); - // Start Processing the flow - - controller->start(); - logger->log_info("MiNiFi started"); - - /** - * Sem wait provides us the ability to have a controlled - * yield without the need for a more complex construct and - * a spin lock - */ - if (sem_wait(running) != -1) - perror("sem_wait"); - - sem_unlink("MiNiFiMain"); - - /** - * Trigger unload -- wait stop_wait_time - */ - controller->waitUnload(stop_wait_time); - - delete controller; - - logger->log_info("MiNiFi exit"); - - return 0; + std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); + + logger->setLogLevel(logging::info); + + uint16_t stop_wait_time = STOP_WAIT_TIME_MS; + + std::string graceful_shutdown_seconds = ""; + std::string prov_repo_class = "provenancerepository"; + std::string flow_repo_class = "flowfilerepository"; + std::string nifi_configuration_class_name = "yamlconfiguration"; + + running = sem_open("MiNiFiMain", O_CREAT, 0644, 0); + if (running == SEM_FAILED || running == 0) { + + logger->log_error("could not initialize semaphore"); + perror("initialization failure"); + } + // assumes POSIX compliant environment + std::string minifiHome; + if (const char* env_p = std::getenv(MINIFI_HOME_ENV_KEY)) { + minifiHome = env_p; + } else { + logger->log_info( + "MINIFI_HOME was not found, determining based on executable path."); + char *path = NULL; + char full_path[PATH_MAX]; + path = realpath(argv[0], full_path); + std::string minifiHomePath(path); + minifiHomePath = minifiHomePath.substr(0, + minifiHomePath.find_last_of("/\\")); //Remove /minifi from path + minifiHome = minifiHomePath.substr(0, minifiHomePath.find_last_of("/\\")); //Remove /bin from path + } + + if (signal(SIGINT, sigHandler) == SIG_ERR + || signal(SIGTERM, sigHandler) == SIG_ERR + || signal(SIGPIPE, SIG_IGN) == SIG_ERR) { + logger->log_error("Can not install signal handler"); + return -1; + } + + minifi::Configure *configure = minifi::Configure::getConfigure(); + configure->setHome(minifiHome); + configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE); + + if (configure->get(minifi::Configure::nifi_graceful_shutdown_seconds, + graceful_shutdown_seconds)) { + try { + stop_wait_time = std::stoi(graceful_shutdown_seconds); + } catch (const std::out_of_range &e) { + logger->log_error("%s is out of range. %s", + minifi::Configure::nifi_graceful_shutdown_seconds, + e.what()); + } catch (const std::invalid_argument &e) { + logger->log_error("%s contains an invalid argument set. %s", + minifi::Configure::nifi_graceful_shutdown_seconds, + e.what()); + } + } else { + logger->log_debug("%s not set, defaulting to %d", + minifi::Configure::nifi_graceful_shutdown_seconds, + STOP_WAIT_TIME_MS); + } + + // set the log configuration. + std::unique_ptr<logging::BaseLogger> configured_logger = + logging::LogInstance::getConfiguredLogger(configure); + + logger->updateLogger(std::move(configured_logger)); + + configure->get(minifi::Configure::nifi_provenance_repository_class_name, + prov_repo_class); + // Create repos for flow record and provenance + std::shared_ptr<core::Repository> prov_repo = core::createRepository( + prov_repo_class, true); + prov_repo->initialize(); + + configure->get(minifi::Configure::nifi_flow_repository_class_name, + flow_repo_class); + + std::shared_ptr<core::Repository> flow_repo = core::createRepository( + flow_repo_class, true); + + flow_repo->initialize(); + + configure->get(minifi::Configure::nifi_configuration_class_name, + nifi_configuration_class_name); + + std::unique_ptr<core::FlowConfiguration> flow_configuration = std::move( + core::createFlowConfiguration(prov_repo, flow_repo, + nifi_configuration_class_name)); + + controller = std::unique_ptr<minifi::FlowController>( + new minifi::FlowController(prov_repo, flow_repo, + std::move(flow_configuration))); + + // Load flow from specified configuration file + controller->load(); + // Start Processing the flow + + controller->start(); + logger->log_info("MiNiFi started"); + + /** + * Sem wait provides us the ability to have a controlled + * yield without the need for a more complex construct and + * a spin lock + */ + if (sem_wait(running) != -1) + perror("sem_wait"); + + sem_unlink("MiNiFiMain"); + + /** + * Trigger unload -- wait stop_wait_time + */ + controller->waitUnload(stop_wait_time); + + logger->log_info("MiNiFi exit"); + + return 0; }
