http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/cmake/BuildTests.cmake ---------------------------------------------------------------------- diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake index 4f4bed0..e76802d 100644 --- a/cmake/BuildTests.cmake +++ b/cmake/BuildTests.cmake @@ -31,13 +31,11 @@ ENDMACRO() function(createTests testName) message ("-- Adding test: ${testName}") - target_include_directories(${testName} PRIVATE BEFORE ${UUID_INCLUDE_DIRS}) target_include_directories(${testName} PRIVATE BEFORE "thirdparty/catch") target_include_directories(${testName} PRIVATE BEFORE "thirdparty/spdlog-20170710/include") target_include_directories(${testName} PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") target_include_directories(${testName} PRIVATE BEFORE "thirdparty/jsoncpp/include") target_include_directories(${testName} PRIVATE BEFORE "thirdparty/civetweb-1.9.1/include") - target_include_directories(${testName} PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS}) target_include_directories(${testName} PRIVATE BEFORE "include") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/c2/protocols") @@ -52,8 +50,8 @@ function(createTests testName) target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/utils") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/processors") target_include_directories(${testName} PRIVATE BEFORE "libminifi/include/provenance") - target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB}) - if (HTTP-CURL) + target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB}) + if (CURL_FOUND) target_include_directories(${testName} PRIVATE BEFORE ${CURL_INCLUDE_DIRS}) target_include_directories(${testName} PRIVATE BEFORE "extensions/http-curl/") target_include_directories(${testName} PRIVATE BEFORE "extensions/http-curl/client/") @@ -61,7 +59,7 @@ function(createTests testName) target_include_directories(${testName} PRIVATE BEFORE "extensions/http-curl/protocols/") target_link_libraries(${testName} ${CURL_LIBRARIES} ) if (APPLE) - target_link_libraries (${testName} -Wl,-all_load ${HTTP-CURL}) + target_link_libraries (${testName} -Wl,-all_load ${HTTP-CURL}) else () target_link_libraries (${testName} -Wl,--whole-archive ${HTTP-CURL} -Wl,--no-whole-archive) endif () @@ -77,6 +75,7 @@ SET(TEST_RESOURCES ${TEST_DIR}/resources) GETSOURCEFILES(UNIT_TESTS "${TEST_DIR}/unit/") GETSOURCEFILES(INTEGRATION_TESTS "${TEST_DIR}/integration/") GETSOURCEFILES(CURL_INTEGRATION_TESTS "${TEST_DIR}/curl-tests/") +GETSOURCEFILES(ROCKSDB_INTEGRATION_TESTS "${TEST_DIR}/rocksdb-tests/") SET(UNIT_TEST_COUNT 0) FOREACH(testfile ${UNIT_TESTS}) @@ -93,11 +92,28 @@ FOREACH(testfile ${INTEGRATION_TESTS}) get_filename_component(testfilename "${testfile}" NAME_WE) add_executable("${testfilename}" "${TEST_DIR}/integration/${testfile}" ${SPD_SOURCES} "${TEST_DIR}/TestBase.cpp") createTests("${testfilename}") - #message("Adding ${testfilename} from ${testfile}") MATH(EXPR INT_TEST_COUNT "${INT_TEST_COUNT}+1") ENDFOREACH() message("-- Finished building ${INT_TEST_COUNT} integration test file(s)...") +if (ROCKSDB-REPOS) +SET(ROCKSDB_TEST_COUNT 0) +FOREACH(testfile ${ROCKSDB_INTEGRATION_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${TEST_DIR}/rocksdb-tests/${testfile}" ${SPD_SOURCES} "${TEST_DIR}/TestBase.cpp") + target_include_directories(${testfilename} PRIVATE BEFORE "extensions/rocksdb-repos/") + target_include_directories(${testfilename} PRIVATE BEFORE "thirdparty/rocksdb/include") + if (APPLE) + target_link_libraries (${testfilename} -Wl,-all_load ${ROCKSDB-REPOS}) + else () + target_link_libraries (${testfilename} -Wl,--whole-archive ${ROCKSDB-REPOS} -Wl,--no-whole-archive) + endif () + createTests("${testfilename}") + MATH(EXPR ROCKSDB_TEST_COUNT "${ROCKSDB_TEST_COUNT}+1") +ENDFOREACH() +message("-- Finished building ${ROCKSDB_TEST_COUNT} RocksDB related test file(s)...") +endif(ROCKSDB-REPOS) + if (HTTP-CURL) SET(CURL_INT_TEST_COUNT 0) @@ -124,8 +140,6 @@ add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${ add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") -add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") - add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8082/nifi-api/controller")
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/cmake/FindLeveldb.cmake ---------------------------------------------------------------------- diff --git a/cmake/FindLeveldb.cmake b/cmake/FindLeveldb.cmake deleted file mode 100644 index a6d94c0..0000000 --- a/cmake/FindLeveldb.cmake +++ /dev/null @@ -1,50 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -# Find module for Leveldb library and includes -# LEVELDB_FOUND - if system found LEVELDB library -# LEVELDB_INCLUDE_DIRS - The LEVELDB include directories -# LEVELDB_LIBRARIES - The libraries needed to use LEVELDB -# LEVELDB_DEFINITIONS - Compiler switches required for using LEVELDB - -# For OS X do not attempt to use the OS X application frameworks or bundles. -set (CMAKE_FIND_FRAMEWORK NEVER) -set (CMAKE_FIND_APPBUNDLE NEVER) - -find_path(LEVELDB_INCLUDE_DIR - NAMES leveldb/db.h - PATHS /usr/local/include /usr/include - DOC "LevelDB include header" -) - -find_library(LEVELDB_LIBRARY - NAMES libleveldb.dylib libleveldb.so - PATHS /usr/local/lib /usr/lib/x86_64-linux-gnu - DOC "LevelDB library" -) - -include(FindPackageHandleStandardArgs) -find_package_handle_standard_args(LEVELDB DEFAULT_MSG LEVELDB_INCLUDE_DIR LEVELDB_LIBRARY) - -if (LEVELDB_FOUND) - set(LEVELDB_LIBRARIES ${LEVELDB_LIBRARY} ) - set(LEVELDB_INCLUDE_DIRS ${LEVELDB_INCLUDE_DIR} ) - set(LEVELDB_DEFINITIONS ) -endif() - -mark_as_advanced(LEVELDB_ROOT_DIR LEVELDB_INCLUDE_DIR LEVELDB_LIBRARY) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/cmake/FindRocksDB.cmake ---------------------------------------------------------------------- diff --git a/cmake/FindRocksDB.cmake b/cmake/FindRocksDB.cmake new file mode 100644 index 0000000..db9c2d1 --- /dev/null +++ b/cmake/FindRocksDB.cmake @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# ROCKSDB_FOUND System has RocksDB library/headers. +# ROCKSDB_LIBRARIES The RocksDB library. +# ROCKSDB_INCLUDE_DIR The location of RocksDB headers. + +find_path(ROCKSDB_ROOT_DIR + NAMES include/rocksdb/db.h +) + +find_library(ROCKSDB_LIBRARIES + NAMES rocksdb + HINTS ${ROCKSDB_ROOT_DIR}/lib +) + +find_path(ROCKSDB_INCLUDE_DIR + NAMES rocksdb/db.h + HINTS ${ROCKSDB_ROOT_DIR}/include +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(RocksDB DEFAULT_MSG + ROCKSDB_LIBRARIES + ROCKSDB_INCLUDE_DIR +) + +mark_as_advanced( + ROCKSDB_ROOT_DIR + ROCKSDB_LIBRARIES + ROCKSDB_INCLUDE_DIR +) + +if(ROCKSDB_INCLUDE_DIR AND ROCKSDB_LIBRARIES) + set(ROCKSDB_FOUND "YES") + message(STATUS "Found RocksDB...${ROCKSDB_LIBRARIES}") +endif() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/docker/Dockerfile b/docker/Dockerfile index 7688439..641f0e2 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -17,7 +17,7 @@ # # First stage: the build environment -# Edge required for leveldb +# Edge required for rocksdb FROM alpine:edge AS builder MAINTAINER Apache NiFi <[email protected]> @@ -27,7 +27,7 @@ ARG MINIFI_VERSION ARG MINIFI_SOURCE_CODE # Install the system dependencies needed for a build -# Add testing repo for leveldb +# Add testing repo for rocksdb RUN echo 'http://dl-cdn.alpinelinux.org/alpine/edge/testing' >> /etc/apk/repositories RUN apk --update --no-cache upgrade && apk --update --no-cache add gcc \ g++ \ @@ -38,7 +38,7 @@ RUN apk --update --no-cache upgrade && apk --update --no-cache add gcc \ boost-dev \ vim \ util-linux-dev \ - leveldb-dev \ + rocksdb-dev \ curl-dev \ cmake \ git \ @@ -70,7 +70,7 @@ RUN cd $MINIFI_BASE_DIR \ && tar -xzvf $MINIFI_BASE_DIR/build/nifi-minifi-cpp-$MINIFI_VERSION-bin.tar.gz -C $MINIFI_BASE_DIR # Second stage: the runtime image -# Edge required for leveldb +# Edge required for rocksdb FROM alpine:edge ARG UID @@ -78,12 +78,12 @@ ARG GID ARG MINIFI_VERSION ARG MINIFI_SOURCE_CODE -# Add testing repo for leveldb +# Add testing repo for rocksdb RUN echo 'http://dl-cdn.alpinelinux.org/alpine/edge/testing' >> /etc/apk/repositories RUN apk --update --no-cache upgrade && apk add --update --no-cache \ util-linux \ - leveldb \ + rocksdb \ curl \ unzip \ gpsd \ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/http-curl/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/http-curl/CMakeLists.txt b/extensions/http-curl/CMakeLists.txt index cb91683..b8b0629 100644 --- a/extensions/http-curl/CMakeLists.txt +++ b/extensions/http-curl/CMakeLists.txt @@ -24,7 +24,7 @@ find_package(CURL REQUIRED) set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") -include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/leveldb-1.18/include ../../thirdparty/) +include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/) find_package(Boost REQUIRED) include_directories(${Boost_INCLUDE_DIRS}) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/CMakeLists.txt b/extensions/rocksdb-repos/CMakeLists.txt new file mode 100644 index 0000000..82df6db --- /dev/null +++ b/extensions/rocksdb-repos/CMakeLists.txt @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cmake_minimum_required(VERSION 2.6) + +find_package(CURL REQUIRED) + +set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") +set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") + +include_directories(../../libminifi/include ../../libminifi/include/c2 ../../libminifi/include/c2/protocols/ ../../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../../libminifi/include/core/yaml ../../libminifi/include/core ../../thirdparty/spdlog-20170710/include ../../thirdparty/concurrentqueue ../../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../../thirdparty/civetweb-1.9.1/include ../../thirdparty/jsoncpp/include ../../thirdparty/rocksdb/include ../../thirdparty/) + +find_package(Boost REQUIRED) +find_package(RocksDB) + +SET_PACKAGE_INFO("HTTP-CURL" "libcURL implementation code that supports " ) + +include_directories(${Boost_INCLUDE_DIRS}) + +file(GLOB SOURCES "*.cpp") + +add_library(minifi-rocksdb-repos STATIC ${SOURCES}) +set_property(TARGET minifi-rocksdb-repos PROPERTY POSITION_INDEPENDENT_CODE ON) +if(THREADS_HAVE_PTHREAD_ARG) + target_compile_options(PUBLIC minifi-rocksdb-repos "-pthread") +endif() +if(CMAKE_THREAD_LIBS_INIT) + target_link_libraries(minifi-rocksdb-repos "${CMAKE_THREAD_LIBS_INIT}") +endif() + +if (CURL_FOUND) + include_directories(${CURL_INCLUDE_DIRS}) + target_link_libraries (minifi-rocksdb-repos ${CURL_LIBRARIES}) +endif(CURL_FOUND) + +# Include UUID +find_package(UUID REQUIRED) +target_link_libraries(minifi-rocksdb-repos ${LIBMINIFI} ${UUID_LIBRARIES} ${JSONCPP_LIB}) +add_dependencies(minifi-rocksdb-repos jsoncpp_project) +find_package(OpenSSL REQUIRED) +include_directories(${OPENSSL_INCLUDE_DIR}) +target_link_libraries(minifi-rocksdb-repos ${CMAKE_DL_LIBS} ) +if (ROCKSDB_FOUND) + target_link_libraries(minifi-rocksdb-repos ${ROCKSDB_LIBRARIES} ) +else() + target_link_libraries(minifi-rocksdb-repos rocksdb ) +endif() +find_package(ZLIB REQUIRED) +include_directories(${ZLIB_INCLUDE_DIRS}) +target_link_libraries (minifi-rocksdb-repos ${ZLIB_LIBRARIES}) +if (WIN32) + set_target_properties(minifi-rocksdb-repos PROPERTIES + LINK_FLAGS "/WHOLEARCHIVE" + ) +elseif (APPLE) + set_target_properties(minifi-rocksdb-repos PROPERTIES + LINK_FLAGS "-Wl,-all_load" + ) +else () + set_target_properties(minifi-rocksdb-repos PROPERTIES + LINK_FLAGS "-Wl,--whole-archive" + ) +endif () + + +SET (ROCKSDB-REPOS minifi-rocksdb-repos PARENT_SCOPE) + + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/DatabaseContentRepository.cpp ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp new file mode 100644 index 0000000..50f007f --- /dev/null +++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp @@ -0,0 +1,112 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DatabaseContentRepository.h" +#include <memory> +#include <string> +#include "RocksDbStream.h" +#include "rocksdb/merge_operator.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configure> &configuration) { + std::string value; + if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value)) { + directory_ = value; + } else { + directory_ = "dbcontentrepository"; + } + rocksdb::Options options; + options.create_if_missing = true; + options.use_direct_io_for_flush_and_compaction = true; + options.use_direct_reads = true; + options.merge_operator = std::make_shared<StringAppender>(); + options.error_if_exists = false; + options.max_successive_merges = 0; + rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_); + if (status.ok()) { + logger_->log_debug("NiFi Content DB Repository database open %s success", directory_.c_str()); + is_valid_ = true; + } else { + logger_->log_error("NiFi Content DB Repository database open %s fail", directory_.c_str()); + is_valid_ = false; + } + return is_valid_; +} +void DatabaseContentRepository::stop() { + if (db_) { + db_->FlushWAL(true); + delete db_; + db_ = nullptr; + } +} + +std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const std::shared_ptr<minifi::ResourceClaim> &claim) { + // the traditional approach with these has been to return -1 from the stream; however, since we have the ability here + // we can simply return a nullptr, which is also valid from the API when this stream is not valid. + if (nullptr == claim || !is_valid_ || !db_) + return nullptr; + return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, true); +} + +std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) { + // the traditional approach with these has been to return -1 from the stream; however, since we have the ability here + // we can simply return a nullptr, which is also valid from the API when this stream is not valid. + if (nullptr == claim || !is_valid_ || !db_) + return nullptr; + return std::make_shared<io::RocksDbStream>(claim->getContentFullPath(), db_, false); +} + +bool DatabaseContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &streamId) { + std::string value; + rocksdb::Status status; + status = db_->Get(rocksdb::ReadOptions(), streamId->getContentFullPath(), &value); + if (status.ok()) { + logger_->log_debug("%s exists", streamId->getContentFullPath()); + return true; + } else { + logger_->log_debug("%s does not exist", streamId->getContentFullPath()); + return false; + } +} + +bool DatabaseContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) { + if (nullptr == claim || !is_valid_ || !db_) + return false; + rocksdb::Status status; + status = db_->Delete(rocksdb::WriteOptions(), claim->getContentFullPath()); + if (status.ok()) { + logger_->log_debug("Deleted %s", claim->getContentFullPath()); + return true; + } else { + logger_->log_debug("Attempted, but could not delete %s", claim->getContentFullPath()); + return false; + } +} + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/DatabaseContentRepository.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h new file mode 100644 index 0000000..e43ff35 --- /dev/null +++ b/extensions/rocksdb-repos/DatabaseContentRepository.h @@ -0,0 +1,132 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_DatabaseContentRepository_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_DatabaseContentRepository_H_ + +#include "rocksdb/db.h" +#include "rocksdb/merge_operator.h" +#include "core/Core.h" +#include "core/Connectable.h" +#include "../ContentRepository.h" +#include "properties/Configure.h" +#include "core/logging/LoggerConfiguration.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +class StringAppender : public rocksdb::AssociativeMergeOperator { + public: + // Constructor: specify delimiter + explicit StringAppender() { + + } + + virtual bool Merge(const rocksdb::Slice& key, const rocksdb::Slice* existing_value, const rocksdb::Slice& value, std::string* new_value, rocksdb::Logger* logger) const { + // Clear the *new_value for writing. + assert(new_value); + new_value->clear(); + + if (!existing_value) { + // No existing_value. Set *new_value = value + new_value->assign(value.data(), value.size()); + } else { + new_value->reserve(existing_value->size() + value.size()); + new_value->assign(existing_value->data(), existing_value->size()); + new_value->append(value.data(), value.size()); + } + + return true; + } + + virtual const char* Name() const { + return "StringAppender"; + } + + private: + +}; + +/** + * DatabaseContentRepository is a content repository that stores data onto the local file system. + */ +class DatabaseContentRepository : public core::ContentRepository, public core::Connectable { + public: + + DatabaseContentRepository(std::string name = getClassName<DatabaseContentRepository>(), uuid_t uuid = 0) + : core::Connectable(name, uuid), + logger_(logging::LoggerFactory<DatabaseContentRepository>::getLogger()), + is_valid_(false), + db_(nullptr) { + } + virtual ~DatabaseContentRepository() { + stop(); + } + + virtual bool initialize(const std::shared_ptr<minifi::Configure> &configuration); + + virtual void stop(); + + virtual std::shared_ptr<io::BaseStream> write(const std::shared_ptr<minifi::ResourceClaim> &claim); + + virtual std::shared_ptr<io::BaseStream> read(const std::shared_ptr<minifi::ResourceClaim> &claim); + + virtual bool close(const std::shared_ptr<minifi::ResourceClaim> &claim) { + return remove(claim); + } + + virtual bool remove(const std::shared_ptr<minifi::ResourceClaim> &claim); + + virtual bool exists(const std::shared_ptr<minifi::ResourceClaim> &streamId); + + virtual void yield() { + + } + + /** + * Determines if we are connected and operating + */ + virtual bool isRunning() { + return true; + } + + /** + * Determines if work is available by this connectable + * @return boolean if work is available. + */ + virtual bool isWorkAvailable() { + return true; + } + + private: + bool is_valid_; + std::string directory_; + rocksdb::DB* db_; + std::shared_ptr<logging::Logger> logger_; +}; + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_DatabaseContentRepository_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/FlowFileRepository.cpp ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp new file mode 100644 index 0000000..df2be6e --- /dev/null +++ b/extensions/rocksdb-repos/FlowFileRepository.cpp @@ -0,0 +1,140 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "FlowFileRepository.h" +#include "rocksdb/write_batch.h" +#include <memory> +#include <string> +#include <utility> +#include <vector> +#include "FlowFileRecord.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +void FlowFileRepository::flush() { + rocksdb::WriteBatch batch; + std::string key; + std::string value; + rocksdb::ReadOptions options; + + std::vector<std::shared_ptr<FlowFileRecord>> purgeList; + + uint64_t decrement_total = 0; + while (keys_to_delete.size_approx() > 0) { + if (keys_to_delete.try_dequeue(key)) { + db_->Get(options, key, &value); + decrement_total += value.size(); + std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); + if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(value.data()), value.size())) { + purgeList.push_back(eventRead); + } + logger_->log_info("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath()); + batch.Delete(key); + } + } + if (db_->Write(rocksdb::WriteOptions(), &batch).ok()) { + logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load()); + if (decrement_total > repo_size_.load()) { + repo_size_ = 0; + } else { + repo_size_ -= decrement_total; + } + } + + if (nullptr != content_repo_) { + for (const auto &ffr : purgeList) { + auto claim = ffr->getResourceClaim(); + if (claim != nullptr) { + content_repo_->removeIfOrphaned(claim); + } + } + } +} + +void FlowFileRepository::run() { + // threshold for purge + uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; + + while (running_) { + std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); + uint64_t curTime = getTimeMillis(); + + flush(); + + uint64_t size = getRepoSize(); + + if (size > max_partition_bytes_) + repo_full_ = true; + else + repo_full_ = false; + } +} + +void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { + content_repo_ = content_repo; + std::vector<std::pair<std::string, uint64_t>> purgeList; + rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions()); + + repo_size_ = 0; + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); + std::string key = it->key().ToString(); + repo_size_ += it->value().size(); + if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) { + logger_->log_info("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath()); + auto search = connectionMap.find(eventRead->getConnectionUuid()); + if (search != connectionMap.end()) { + // we find the connection for the persistent flowfile, create the flowfile and enqueue that + std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead); + eventRead->setStoredToRepository(true); + search->second->put(eventRead); + } else { + logger_->log_info("Could not find connectinon for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath()); + if (eventRead->getContentFullPath().length() > 0) { + if (nullptr != eventRead->getResourceClaim()) { + content_repo_->remove(eventRead->getResourceClaim()); + } + } + purgeList.push_back(std::make_pair(key, it->value().size())); + } + } else { + purgeList.push_back(std::make_pair(key, it->value().size())); + } + } + + delete it; + for (auto eventId : purgeList) { + logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str()); + if (Delete(eventId.first)) { + repo_size_ -= eventId.second; + } + } + + return; +} + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/FlowFileRepository.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h new file mode 100644 index 0000000..f473415 --- /dev/null +++ b/extensions/rocksdb-repos/FlowFileRepository.h @@ -0,0 +1,168 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ +#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "core/Repository.h" +#include "core/Core.h" +#include "Connection.h" +#include "core/logging/LoggerConfiguration.h" +#include "concurrentqueue.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace repository { + +#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository" +#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M +#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute +#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2000) // 2000 msec + +/** + * Flow File repository + * Design: Extends Repository and implements the run function, using rocksdb as the primary substrate. + */ +class FlowFileRepository : public core::Repository, public std::enable_shared_from_this<FlowFileRepository> { + public: + // Constructor + + FlowFileRepository(std::string name, uuid_t uuid) + : FlowFileRepository(name){ + + } + + FlowFileRepository(const std::string repo_name = "", std::string directory = FLOWFILE_REPOSITORY_DIRECTORY, int64_t maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, + int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, uint64_t purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD) + : core::SerializableComponent(repo_name,0), Repository(repo_name.length() > 0 ? repo_name : core::getClassName<FlowFileRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod), + logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()), + content_repo_(nullptr) { + db_ = NULL; + } + + // Destructor + ~FlowFileRepository() { + if (db_) + delete db_; + } + + virtual void flush(); + + // initialize + virtual bool initialize(const std::shared_ptr<Configure> &configure) { + std::string value; + + if (configure->get(Configure::nifi_flowfile_repository_directory_default, value)) { + directory_ = value; + } + logger_->log_info("NiFi FlowFile Repository Directory %s", directory_.c_str()); + if (configure->get(Configure::nifi_flowfile_repository_max_storage_size, value)) { + Property::StringToInt(value, max_partition_bytes_); + } + logger_->log_info("NiFi FlowFile Max Partition Bytes %d", max_partition_bytes_); + if (configure->get(Configure::nifi_flowfile_repository_max_storage_time, value)) { + TimeUnit unit; + if (Property::StringToTime(value, max_partition_millis_, unit) && Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) { + } + } + logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms", max_partition_millis_); + rocksdb::Options options; + options.create_if_missing = true; + options.use_direct_io_for_flush_and_compaction = true; + options.use_direct_reads = true; + rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_); + if (status.ok()) { + logger_->log_info("NiFi FlowFile Repository database open %s success", directory_.c_str()); + } else { + logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_.c_str()); + return false; + } + return true; + } + + virtual void run(); + + virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) { + // persistent to the DB + rocksdb::Slice value((const char *) buf, bufLen); + rocksdb::Status status; + repo_size_ += bufLen; + status = db_->Put(rocksdb::WriteOptions(), key, value); + if (status.ok()) + return true; + else + return false; + } + /** + * + * Deletes the key + * @return status of the delete operation + */ + virtual bool Delete(std::string key) { + keys_to_delete.enqueue(key); + return true; + } + /** + * Sets the value from the provided key + * @return status of the get operation. + */ + virtual bool Get(const std::string &key, std::string &value) { + if (db_ == nullptr) + return false; + rocksdb::Status status; + status = db_->Get(rocksdb::ReadOptions(), key, &value); + if (status.ok()) + return true; + else + return false; + } + + virtual void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo); + + void start() { + if (this->purge_period_ <= 0) { + return; + } + if (running_) { + return; + } + running_ = true; + thread_ = std::thread(&FlowFileRepository::run, shared_from_this()); + logger_->log_info("%s Repository Monitor Thread Start", getName()); + } + + private: + moodycamel::ConcurrentQueue<std::string> keys_to_delete; + std::shared_ptr<core::ContentRepository> content_repo_; + rocksdb::DB* db_; + std::shared_ptr<logging::Logger> logger_; +}; + +} /* namespace repository */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/ProvenanceRepository.cpp ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp new file mode 100644 index 0000000..4540a4a --- /dev/null +++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp @@ -0,0 +1,93 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ProvenanceRepository.h" +#include "rocksdb/write_batch.h" +#include <string> +#include <vector> +#include "rocksdb/options.h" +#include "provenance/Provenance.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace provenance { + +void ProvenanceRepository::flush() { + rocksdb::WriteBatch batch; + std::string key; + std::string value; + rocksdb::ReadOptions options; + uint64_t decrement_total = 0; + while (keys_to_delete.size_approx() > 0) { + if (keys_to_delete.try_dequeue(key)) { + db_->Get(options, key, &value); + decrement_total += value.size(); + batch.Delete(key); + logger_->log_info("Removing %s", key); + } + } + if (db_->Write(rocksdb::WriteOptions(), &batch).ok()) { + logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load()); + if (decrement_total > repo_size_.load()) { + repo_size_ = 0; + } else { + repo_size_ -= decrement_total; + } + } +} + +void ProvenanceRepository::run() { + while (running_) { + std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); + uint64_t curTime = getTimeMillis(); + // threshold for purge + uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; + + uint64_t size = getRepoSize(); + + if (size >= purgeThreshold) { + rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + ProvenanceEventRecord eventRead; + std::string key = it->key().ToString(); + uint64_t eventTime = eventRead.getEventTime(reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), it->value().size()); + if (eventTime > 0) { + if ((curTime - eventTime) > max_partition_millis_) + Delete(key); + } else { + logger_->log_debug("NiFi Provenance retrieve event %s fail", key.c_str()); + Delete(key); + } + } + delete it; + } + flush(); + size = getRepoSize(); + if (size > max_partition_bytes_) + repo_full_ = true; + else + repo_full_ = false; + } +} +} /* namespace provenance */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h new file mode 100644 index 0000000..67072eb --- /dev/null +++ b/extensions/rocksdb-repos/ProvenanceRepository.h @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ +#define LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "core/Repository.h" +#include "core/Core.h" +#include "provenance/Provenance.h" +#include "core/logging/LoggerConfiguration.h" +#include "concurrentqueue.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace provenance { + +#define PROVENANCE_DIRECTORY "./provenance_repository" +#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M +#define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute +#define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec + +class ProvenanceRepository : public core::Repository, public std::enable_shared_from_this<ProvenanceRepository> { + public: + + ProvenanceRepository(std::string name, uuid_t uuid) + : ProvenanceRepository(name){ + + } + // Constructor + /*! + * Create a new provenance repository + */ + ProvenanceRepository(const std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY, int64_t maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME, int64_t maxPartitionBytes = + MAX_PROVENANCE_STORAGE_SIZE, + uint64_t purgePeriod = PROVENANCE_PURGE_PERIOD) + : core::SerializableComponent(repo_name, 0), + Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod), + logger_(logging::LoggerFactory<ProvenanceRepository>::getLogger()) { + db_ = NULL; + } + + // Destructor + virtual ~ProvenanceRepository() { + if (db_) + delete db_; + } + + virtual void flush(); + + void start() { + if (this->purge_period_ <= 0) + return; + if (running_) + return; + running_ = true; + thread_ = std::thread(&ProvenanceRepository::run, shared_from_this()); + logger_->log_info("%s Repository Monitor Thread Start", name_); + } + + // initialize + virtual bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) { + std::string value; + if (config->get(Configure::nifi_provenance_repository_directory_default, value)) { + directory_ = value; + } + logger_->log_info("NiFi Provenance Repository Directory %s", directory_.c_str()); + if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) { + core::Property::StringToInt(value, max_partition_bytes_); + } + logger_->log_info("NiFi Provenance Max Partition Bytes %d", max_partition_bytes_); + if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, max_partition_millis_, unit) && core::Property::ConvertTimeUnitToMS(max_partition_millis_, unit, max_partition_millis_)) { + } + } + logger_->log_info("NiFi Provenance Max Storage Time: [%d] ms", max_partition_millis_); + rocksdb::Options options; + options.create_if_missing = true; + options.use_direct_io_for_flush_and_compaction = true; + options.use_direct_reads = true; + rocksdb::Status status = rocksdb::DB::Open(options, directory_.c_str(), &db_); + if (status.ok()) { + logger_->log_info("NiFi Provenance Repository database open %s success", directory_.c_str()); + } else { + logger_->log_error("NiFi Provenance Repository database open %s fail", directory_.c_str()); + return false; + } + + return true; + } + // Put + virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) { + + if (repo_full_) { + return false; + } + + // persist to the DB + rocksdb::Slice value((const char *) buf, bufLen); + rocksdb::Status status; + status = db_->Put(rocksdb::WriteOptions(), key, value); + if (status.ok()) + return true; + else + return false; + } + // Delete + virtual bool Delete(std::string key) { + keys_to_delete.enqueue(key); + return true; + } + // Get + virtual bool Get(const std::string &key, std::string &value) { + rocksdb::Status status; + status = db_->Get(rocksdb::ReadOptions(), key, &value); + if (status.ok()) + return true; + else + return false; + } + + // Remove event + void removeEvent(ProvenanceEventRecord *event) { + Delete(event->getEventId()); + } + + virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) { + return Put(key, buffer, bufferSize); + } + + virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) { + rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); + std::string key = it->key().ToString(); + if (store.size() >= max_size) + break; + if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { + store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead)); + } + } + delete it; + return true; + } + + virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) { + rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions()); + size_t requested_batch = max_size; + max_size = 0; + for (it->SeekToFirst(); it->Valid(); it->Next()) { + + if (max_size >= requested_batch) + break; + std::shared_ptr<core::SerializableComponent> eventRead = lambda(); + std::string key = it->key().ToString(); + if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { + max_size++; + records.push_back(eventRead); + } + + } + delete it; + + if (max_size > 0) { + return true; + } else { + return false; + } + } + //! get record + void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) { + rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); + std::string key = it->key().ToString(); + if (records.size() >= maxSize) + break; + if (eventRead->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { + records.push_back(eventRead); + } + } + delete it; + } + + virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) { + rocksdb::Iterator* it = db_->NewIterator(rocksdb::ReadOptions()); + max_size = 0; + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); + std::string key = it->key().ToString(); + + if (store.at(max_size)->DeSerialize((uint8_t *) it->value().data(), (int) it->value().size())) { + max_size++; + } + if (store.size() >= max_size) + break; + } + delete it; + if (max_size > 0) { + return true; + } else { + return false; + } + } + //! purge record + void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) { + for (auto record : records) { + Delete(record->getEventId()); + } + flush(); + } + // destroy + void destroy() { + if (db_) { + delete db_; + db_ = NULL; + } + } + // Run function for the thread + void run(); + + // Prevent default copy constructor and assignment operation + // Only support pass by reference or pointer + ProvenanceRepository(const ProvenanceRepository &parent) = delete; + ProvenanceRepository &operator=(const ProvenanceRepository &parent) = delete; + + private: + moodycamel::ConcurrentQueue<std::string> keys_to_delete; + rocksdb::DB* db_; + std::shared_ptr<logging::Logger> logger_; +}; + +} /* namespace provenance */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif /* LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCEREPOSITORY_H_ */ + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/RocksDBLoader.cpp ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/RocksDBLoader.cpp b/extensions/rocksdb-repos/RocksDBLoader.cpp new file mode 100644 index 0000000..444c1db --- /dev/null +++ b/extensions/rocksdb-repos/RocksDBLoader.cpp @@ -0,0 +1,29 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/FlowConfiguration.h" +#include "RocksDBLoader.h" + +bool RocksDBFactory::added = core::FlowConfiguration::add_static_func("createRocksDBFactory"); + +extern "C" { + +void *createRocksDBFactory(void) { + return new RocksDBFactory(); +} + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/RocksDBLoader.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/RocksDBLoader.h b/extensions/rocksdb-repos/RocksDBLoader.h new file mode 100644 index 0000000..4b14def --- /dev/null +++ b/extensions/rocksdb-repos/RocksDBLoader.h @@ -0,0 +1,80 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_ROCKSDBLOADER_H +#define EXTENSIONS_ROCKSDBLOADER_H + +#include "DatabaseContentRepository.h" +#include "FlowFileRepository.h" +#include "ProvenanceRepository.h" +#include "RocksDbStream.h" +#include "core/ClassLoader.h" + +class __attribute__((visibility("default"))) RocksDBFactory : public core::ObjectFactory { + public: + RocksDBFactory() { + + } + + /** + * Gets the name of the object. + * @return class name of processor + */ + virtual std::string getName() { + return "RocksDBFactory"; + } + + virtual std::string getClassName() { + return "RocksDBFactory"; + } + /** + * Gets the class name for the object + * @return class name for the processor. + */ + virtual std::vector<std::string> getClassNames() { + std::vector<std::string> class_names; + class_names.push_back("DatabaseContentRepository"); + class_names.push_back("FlowFileRepository"); + class_names.push_back("ProvenanceRepository"); + class_names.push_back("databasecontentrepository"); + class_names.push_back("flowfilerepository"); + class_names.push_back("provenancerepository"); + return class_names; + } + + virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) { + std::string name = class_name; + std::transform(name.begin(), name.end(), name.begin(), ::tolower); + if (name == "databasecontentrepository") { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<core::repository::DatabaseContentRepository>()); + } else if (name == "flowfilerepository") { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<core::repository::FlowFileRepository>()); + } else if (name == "provenancerepository") { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::provenance::ProvenanceRepository>()); + } else { + return nullptr; + } + } + + static bool added; + +}; + +extern "C" { +void *createRocksDBFactory(void); +} +#endif /* EXTENSIONS_ROCKSDBLOADER_H */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/RocksDbStream.cpp ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp new file mode 100644 index 0000000..73f691e --- /dev/null +++ b/extensions/rocksdb-repos/RocksDbStream.cpp @@ -0,0 +1,124 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RocksDbStream.h" +#include <fstream> +#include <vector> +#include <memory> +#include <string> +#include "io/validation.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +RocksDbStream::RocksDbStream(const std::string &path, rocksdb::DB *db, bool write_enable) + : BaseStream(), + logger_(logging::LoggerFactory<RocksDbStream>::getLogger()), + db_(db), + path_(path), + write_enable_(write_enable) { + rocksdb::Status status; + status = db_->Get(rocksdb::ReadOptions(), path_, &value_); + if (status.ok()) { + exists_ = true; + } else { + exists_ = false; + } + offset_ = 0; + size_ = value_.size(); +} + +void RocksDbStream::closeStream() { +} + +void RocksDbStream::seek(uint64_t offset) { + // noop +} + +int RocksDbStream::writeData(std::vector<uint8_t> &buf, int buflen) { + if (buf.capacity() < buflen) { + return -1; + } + return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); +} + +// data stream overrides + +int RocksDbStream::writeData(uint8_t *value, int size) { + if (!IsNullOrEmpty(value) && write_enable_) { + rocksdb::Slice slice_value((const char *) value, size); + rocksdb::Status status; + size_ += size; + rocksdb::WriteOptions opts; + opts.sync = true; + db_->Merge(opts, path_, slice_value); + if (status.ok()) { + return 0; + } else { + return -1; + } + } else { + return -1; + } +} + +template<typename T> +inline std::vector<uint8_t> RocksDbStream::readBuffer(const T& t) { + std::vector<uint8_t> buf; + buf.resize(sizeof t); + readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t)); + return buf; +} + +int RocksDbStream::readData(std::vector<uint8_t> &buf, int buflen) { + if (buf.capacity() < buflen) { + buf.resize(buflen); + } + int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); + + if (ret < buflen) { + buf.resize(ret); + } + return ret; +} + +int RocksDbStream::readData(uint8_t *buf, int buflen) { + if (!IsNullOrEmpty(buf) && exists_) { + int amtToRead = buflen; + if (offset_ >= value_.size()) { + return 0; + } + if (buflen > value_.size() - offset_) { + amtToRead = value_.size() - offset_; + } + std::memcpy(buf, value_.data() + offset_, amtToRead); + offset_ += amtToRead; + return amtToRead; + } else { + return -1; + } +} + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/extensions/rocksdb-repos/RocksDbStream.h ---------------------------------------------------------------------- diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h new file mode 100644 index 0000000..da08899 --- /dev/null +++ b/extensions/rocksdb-repos/RocksDbStream.h @@ -0,0 +1,185 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_IO_TLS_RocksDbStream_H_ +#define LIBMINIFI_INCLUDE_IO_TLS_RocksDbStream_H_ + +#include "rocksdb/db.h" +#include <iostream> +#include <cstdint> +#include <string> +#include "io/EndianCheck.h" +#include "io/BaseStream.h" +#include "io/Serializable.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +/** + * Purpose: File Stream Base stream extension. This is intended to be a thread safe access to + * read/write to the local file system. + * + * Design: Simply extends BaseStream and overrides readData/writeData to allow a sink to the + * fstream object. + */ +class RocksDbStream : public io::BaseStream { + public: + /** + * File Stream constructor that accepts an fstream shared pointer. + * It must already be initialized for read and write. + */ + explicit RocksDbStream(const std::string &path, rocksdb::DB *db, bool write_enable = false); + + /** + * File Stream constructor that accepts an fstream shared pointer. + * It must already be initialized for read and write. + */ + explicit RocksDbStream(const std::string &path); + + virtual ~RocksDbStream() { + closeStream(); + } + + virtual void closeStream(); + /** + * Skip to the specified offset. + * @param offset offset to which we will skip + */ + void seek(uint64_t offset); + + const uint32_t getSize() const { + return size_; + } + + virtual int read(uint16_t &value, bool is_little_endian) { + uint8_t buf[2]; + if (readData(&buf[0], 2) < 0) + return -1; + if (is_little_endian) { + value = (buf[0] << 8) | buf[1]; + } else { + value = buf[0] | buf[1] << 8; + } + return 2; + } + + virtual int read(uint32_t &value, bool is_little_endian) { + uint8_t buf[4]; + if (readData(&buf[0], 4) < 0) + return -1; + + if (is_little_endian) { + value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; + } else { + value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; + } + + return 4; + } + virtual int read(uint64_t &value, bool is_little_endian) { + uint8_t buf[0]; + if (readData(&buf[0], 8) < 0) + return -1; + if (is_little_endian) { + value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24) + | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0); + } else { + value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32) + | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56); + } + return 8; + } + + // data stream extensions + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen); + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(uint8_t *buf, int buflen); + + /** + * Write value to the stream using std::vector + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + virtual int writeData(std::vector<uint8_t> &buf, int buflen); + + /** + * writes value to stream + * @param value value to write + * @param size size of value + */ + virtual int writeData(uint8_t *value, int size); + + /** + * Returns the underlying buffer + * @return vector's array + **/ + const uint8_t *getBuffer() const { + throw std::runtime_error("Stream does not support this operation"); + } + + protected: + + /** + * Creates a vector and returns the vector using the provided + * type name. + * @param t incoming object + * @returns vector. + */ + template<typename T> + std::vector<uint8_t> readBuffer(const T&); + + std::string path_; + + bool write_enable_; + + bool exists_; + + int64_t offset_; + + std::string value_; + + rocksdb::DB *db_; + + size_t size_; + + private: + + std::shared_ptr<logging::Logger> logger_; + +}; + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_IO_TLS_RocksDbStream_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 22046e3..9bdab2a 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -39,8 +39,8 @@ IF (IOS) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fobjc-abi-version=2 -fobjc-arc -std=gnu++11 -stdlib=libc++ -isysroot ${CMAKE_OSX_SYSROOT} -DIOS") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fobjc-abi-version=2 -fobjc-arc -isysroot ${CMAKE_OSX_SYSROOT} -DIOS") ELSE () -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DLEVELDB_SUPPORT -DOPENSSL_SUPPORT -DYAML_SUPPORT") -set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DLEVELDB_SUPPORT -DOPENSSL_SUPPORT -DYAML_SUPPORT") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DOPENSSL_SUPPORT -DYAML_SUPPORT") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DOPENSSL_SUPPORT -DYAML_SUPPORT") ENDIF() @@ -61,6 +61,7 @@ include_directories(../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include) include_directories(../thirdparty/civetweb-1.9.1/include) include_directories(../thirdparty/jsoncpp/include) include_directories(../thirdparty/concurrentqueue/) +include_directories(../thirdparty/rocksdb/include) include_directories(include) file(GLOB SOURCES "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/utils/*.cpp" "src/*.cpp") @@ -90,14 +91,10 @@ find_package(Boost COMPONENTS system filesystem REQUIRED) target_link_libraries(minifi ${Boost_SYSTEM_LIBRARY}) target_link_libraries(minifi ${Boost_FILESYSTEM_LIBRARY}) -# Include LevelDB -find_package (Leveldb REQUIRED) -if (LEVELDB_FOUND) - include_directories(${LEVELDB_INCLUDE_DIRS}) - target_link_libraries (minifi ${LEVELDB_LIBRARIES}) -else () - message( FATAL_ERROR "LevelDB was not found. Please install LevelDB" ) -endif (LEVELDB_FOUND) +if (CURL_FOUND) + include_directories(${CURL_INCLUDE_DIRS}) + target_link_libraries (minifi ${CURL_LIBRARIES}) +endif(CURL_FOUND) # Include OpenSSL find_package (OpenSSL REQUIRED) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/cmake/FindLeveldb.cmake ---------------------------------------------------------------------- diff --git a/libminifi/cmake/FindLeveldb.cmake b/libminifi/cmake/FindLeveldb.cmake deleted file mode 100644 index 32adafa..0000000 --- a/libminifi/cmake/FindLeveldb.cmake +++ /dev/null @@ -1,50 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -# Find module for Leveldb library and includes -# LEVELDB_FOUND - if system found LEVELDB library -# LEVELDB_INCLUDE_DIRS - The LEVELDB include directories -# LEVELDB_LIBRARIES - The libraries needed to use LEVELDB -# LEVELDB_DEFINITIONS - Compiler switches required for using LEVELDB - -# For OS X do not attempt to use the OS X application frameworks or bundles. -set (CMAKE_FIND_FRAMEWORK NEVER) -set (CMAKE_FIND_APPBUNDLE NEVER) - -find_path(LEVELDB_INCLUDE_DIR - NAMES leveldb/db.h - PATHS /usr/local/include /usr/include - DOC "LevelDB include header" -) - -find_library(LEVELDB_LIBRARY - NAMES libleveldb.dylib libleveldb.so - PATHS /usr/local/lib /usr/lib/x86_64-linux-gnu - DOC "LevelDB library" -) - -include(FindPackageHandleStandardArgs) -find_package_handle_standard_args(LEVELDB DEFAULT_MSG LEVELDB_INCLUDE_DIR LEVELDB_LIBRARY) - -if (LEVELDB_FOUND) - set(LEVELDB_LIBRARIES ${LEVELDB_LIBRARY} ) - set(LEVELDB_INCLUDE_DIRS ${LEVELDB_INCLUDE_DIR} ) - set(LEVELDB_DEFINITIONS ) -endif() - -mark_as_advanced(LEVELDB_ROOT_DIR LEVELDB_INCLUDE_DIR LEVELDB_LIBRARY) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/cmake/FindRocksDB.cmake ---------------------------------------------------------------------- diff --git a/libminifi/cmake/FindRocksDB.cmake b/libminifi/cmake/FindRocksDB.cmake new file mode 100644 index 0000000..db9c2d1 --- /dev/null +++ b/libminifi/cmake/FindRocksDB.cmake @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# ROCKSDB_FOUND System has RocksDB library/headers. +# ROCKSDB_LIBRARIES The RocksDB library. +# ROCKSDB_INCLUDE_DIR The location of RocksDB headers. + +find_path(ROCKSDB_ROOT_DIR + NAMES include/rocksdb/db.h +) + +find_library(ROCKSDB_LIBRARIES + NAMES rocksdb + HINTS ${ROCKSDB_ROOT_DIR}/lib +) + +find_path(ROCKSDB_INCLUDE_DIR + NAMES rocksdb/db.h + HINTS ${ROCKSDB_ROOT_DIR}/include +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(RocksDB DEFAULT_MSG + ROCKSDB_LIBRARIES + ROCKSDB_INCLUDE_DIR +) + +mark_as_advanced( + ROCKSDB_ROOT_DIR + ROCKSDB_LIBRARIES + ROCKSDB_INCLUDE_DIR +) + +if(ROCKSDB_INCLUDE_DIR AND ROCKSDB_LIBRARIES) + set(ROCKSDB_FOUND "YES") + message(STATUS "Found RocksDB...${ROCKSDB_LIBRARIES}") +endif() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/Connection.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h index c92a626..e88b071 100644 --- a/libminifi/include/Connection.h +++ b/libminifi/include/Connection.h @@ -48,7 +48,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this * Create a new processor */ explicit Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid = NULL, uuid_t srcUUID = - NULL, + NULL, uuid_t destUUID = NULL); // Destructor virtual ~Connection() { @@ -132,6 +132,12 @@ class Connection : public core::Connectable, public std::enable_shared_from_this uint64_t getQueueDataSize() { return queued_data_size_; } + void put(std::shared_ptr<core::Connectable> flow) { + std::shared_ptr<core::FlowFile> ff = std::static_pointer_cast<core::FlowFile>(flow); + if (nullptr != ff) { + put(ff); + } + } // Put the flow file into queue void put(std::shared_ptr<core::FlowFile> flow); // Poll the flow file from queue, the expired flow file record also being returned http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/SchedulingAgent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 569c4ee..3fc72c6 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -38,7 +38,6 @@ #include "core/ProcessContext.h" #include "core/controller/ControllerServiceProvider.h" #include "core/controller/ControllerServiceNode.h" -#include "provenance/ProvenanceRepository.h" namespace org { namespace apache { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/ClassLoader.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ClassLoader.h b/libminifi/include/core/ClassLoader.h index c44cb9a..d16a39b 100644 --- a/libminifi/include/core/ClassLoader.h +++ b/libminifi/include/core/ClassLoader.h @@ -21,7 +21,6 @@ #include <mutex> #include <vector> #include <map> -#include "Connectable.h" #include "utils/StringUtils.h" #include <dlfcn.h> #include "core/Core.h" @@ -55,28 +54,28 @@ class ObjectFactory { /** * Create a shared pointer to a new processor. */ - virtual std::shared_ptr<Connectable> create(const std::string &name) { + virtual std::shared_ptr<CoreComponent> create(const std::string &name) { return nullptr; } /** * Create a shared pointer to a new processor. */ - virtual Connectable *createRaw(const std::string &name) { + virtual CoreComponent *createRaw(const std::string &name) { return nullptr; } /** * Create a shared pointer to a new processor. */ - virtual std::shared_ptr<Connectable> create(const std::string &name, uuid_t uuid) { + virtual std::shared_ptr<CoreComponent> create(const std::string &name, uuid_t uuid) { return nullptr; } /** * Create a shared pointer to a new processor. */ - virtual Connectable* createRaw(const std::string &name, uuid_t uuid) { + virtual CoreComponent* createRaw(const std::string &name, uuid_t uuid) { return nullptr; } @@ -119,33 +118,33 @@ class DefautObjectFactory : public ObjectFactory { /** * Create a shared pointer to a new processor. */ - virtual std::shared_ptr<Connectable> create(const std::string &name) { + virtual std::shared_ptr<CoreComponent> create(const std::string &name) { std::shared_ptr<T> ptr = std::make_shared<T>(name); - return std::static_pointer_cast<Connectable>(ptr); + return std::static_pointer_cast<CoreComponent>(ptr); } /** * Create a shared pointer to a new processor. */ - virtual std::shared_ptr<Connectable> create(const std::string &name, uuid_t uuid) { + virtual std::shared_ptr<CoreComponent> create(const std::string &name, uuid_t uuid) { std::shared_ptr<T> ptr = std::make_shared<T>(name, uuid); - return std::static_pointer_cast<Connectable>(ptr); + return std::static_pointer_cast<CoreComponent>(ptr); } /** * Create a shared pointer to a new processor. */ - virtual Connectable* createRaw(const std::string &name) { + virtual CoreComponent* createRaw(const std::string &name) { T *ptr = new T(name); - return dynamic_cast<Connectable*>(ptr); + return dynamic_cast<CoreComponent*>(ptr); } /** * Create a shared pointer to a new processor. */ - virtual Connectable* createRaw(const std::string &name, uuid_t uuid) { + virtual CoreComponent* createRaw(const std::string &name, uuid_t uuid) { T *ptr = new T(name, uuid); - return dynamic_cast<Connectable*>(ptr); + return dynamic_cast<CoreComponent*>(ptr); } /** @@ -234,7 +233,7 @@ class ClassLoader { * @param uuid uuid of object * @return nullptr or object created from class_name definition. */ - template<class T = Connectable> + template<class T = CoreComponent> std::shared_ptr<T> instantiate(const std::string &class_name, const std::string &name); /** @@ -243,7 +242,7 @@ class ClassLoader { * @param uuid uuid of object * @return nullptr or object created from class_name definition. */ - template<class T = Connectable> + template<class T = CoreComponent> std::shared_ptr<T> instantiate(const std::string &class_name, uuid_t uuid); /** @@ -252,7 +251,7 @@ class ClassLoader { * @param uuid uuid of object * @return nullptr or object created from class_name definition. */ - template<class T = Connectable> + template<class T = CoreComponent> T *instantiateRaw(const std::string &class_name, const std::string &name); /** @@ -261,7 +260,7 @@ class ClassLoader { * @param uuid uuid of object * @return nullptr or object created from class_name definition. */ - template<class T = Connectable> + template<class T = CoreComponent> T *instantiateRaw(const std::string &class_name, uuid_t uuid); protected: @@ -282,7 +281,7 @@ std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, const auto factory_entry = loaded_factories_.find(class_name); if (factory_entry != loaded_factories_.end()) { auto obj = factory_entry->second->create(name); - return std::static_pointer_cast<T>(obj); + return std::dynamic_pointer_cast<T>(obj); } else { return nullptr; } @@ -294,7 +293,7 @@ std::shared_ptr<T> ClassLoader::instantiate(const std::string &class_name, uuid_ auto factory_entry = loaded_factories_.find(class_name); if (factory_entry != loaded_factories_.end()) { auto obj = factory_entry->second->create(class_name, uuid); - return std::static_pointer_cast<T>(obj); + return std::dynamic_pointer_cast<T>(obj); } else { return nullptr; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/Connectable.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h index 5d27901..588b67a 100644 --- a/libminifi/include/core/Connectable.h +++ b/libminifi/include/core/Connectable.h @@ -70,6 +70,10 @@ class __attribute__((visibility("default"))) Connectable : public CoreComponent */ std::set<std::shared_ptr<Connectable>> getOutGoingConnections(std::string relationship); + void put(std::shared_ptr<Connectable> flow) { + + } + /** * Get next incoming connection * @return next incoming connection http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/ContentRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h index 5558b93..54ec8d3 100644 --- a/libminifi/include/core/ContentRepository.h +++ b/libminifi/include/core/ContentRepository.h @@ -23,6 +23,7 @@ #include "io/DataStream.h" #include "io/BaseStream.h" #include "StreamManager.h" +#include "core/Connectable.h" namespace org { namespace apache { @@ -35,6 +36,7 @@ namespace core { */ class ContentRepository : public StreamManager<minifi::ResourceClaim> { public: + virtual ~ContentRepository() { } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/Core.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index 1dc79e7..80b1ca4 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -118,8 +118,12 @@ class CoreComponent { uuid_copy(uuid_, other.uuid_); } + virtual ~CoreComponent() { + + } + // Get component name Name - std::string getName(); + virtual std::string getName(); /** * Set name. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index 344a188..6c50772 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -69,9 +69,8 @@ class FlowConfiguration : public CoreComponent { logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) { controller_services_ = std::make_shared<core::controller::ControllerServiceMap>(); service_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration); - for(auto sl_func : statics_sl_funcs_){ - registerResource("",sl_func); - } + // it is okay if this has already been called + initialize_static_functions(); } virtual ~FlowConfiguration(); @@ -120,11 +119,19 @@ class FlowConfiguration : public CoreComponent { return service_provider_; } - static bool add_static_func(std::string functor){ + static bool add_static_func(std::string functor) { statics_sl_funcs_.push_back(functor); return true; } + static void initialize_static_functions() { + std::lock_guard<std::mutex> lock(atomic_initialization_); + for (auto sl_func : statics_sl_funcs_) { + core::ClassLoader::getDefaultClassLoader().registerResource("", sl_func); + } + statics_sl_funcs_.clear(); + } + protected: void registerResource(const std::string &resource_function) { @@ -151,6 +158,7 @@ class FlowConfiguration : public CoreComponent { private: std::shared_ptr<logging::Logger> logger_; + static std::mutex atomic_initialization_; static std::vector<std::string> statics_sl_funcs_; }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/FlowFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowFile.h b/libminifi/include/core/FlowFile.h index 95b901b..8417ac9 100644 --- a/libminifi/include/core/FlowFile.h +++ b/libminifi/include/core/FlowFile.h @@ -28,7 +28,7 @@ namespace nifi { namespace minifi { namespace core { -class FlowFile { +class FlowFile : public core::Connectable { public: FlowFile(); ~FlowFile(); @@ -179,11 +179,6 @@ class FlowFile { */ uint64_t getOffset(); - // Get the UUID as string - std::string getUUIDStr() { - return uuid_str_; - } - bool getUUID(uuid_t other) { uuid_copy(other, uuid_); return true; @@ -195,6 +190,27 @@ class FlowFile { } /** + * Yield + */ + virtual void yield(){ + + } + /** + * Determines if we are connected and operating + */ + virtual bool isRunning(){ + return true; + } + + /** + * Determines if work is available by this connectable + * @return boolean if work is available. + */ + virtual bool isWorkAvailable(){ + return true; + } + + /** * Sets the original connection with a shared pointer. * @param connection shared connection. */ @@ -245,7 +261,6 @@ class FlowFile { // Size in bytes of the data corresponding to this flow file uint64_t size_; // A global unique identifier - uuid_t uuid_; // A local unique identifier uint64_t id_; // Offset to the content @@ -257,7 +272,7 @@ class FlowFile { // Pointer to the associated content resource claim std::shared_ptr<ResourceClaim> claim_; // UUID string - std::string uuid_str_; + //std::string uuid_str_; // UUID string for all parents std::set<std::string> lineage_Identifiers_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/ProcessGroup.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index 907fdfc..93caab9 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -167,6 +167,8 @@ class ProcessGroup { void getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap); + void getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap); + protected: // A global unique identifier uuid_t uuid_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/Repository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h index cdc81c5..427a46d 100644 --- a/libminifi/include/core/Repository.h +++ b/libminifi/include/core/Repository.h @@ -41,6 +41,7 @@ #include "utils/TimeUtil.h" #include "utils/StringUtils.h" #include "Core.h" +#include "core/Connectable.h" namespace org { namespace apache { @@ -53,7 +54,7 @@ namespace core { #define MAX_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute #define REPOSITORY_PURGE_PERIOD (2500) // 2500 msec -class Repository : public core::SerializableComponent { +class Repository : public virtual core::SerializableComponent { public: /* * Constructor for the repository @@ -100,6 +101,12 @@ class Repository : public core::SerializableComponent { } return found; } + + void setConnectionMap(std::map<std::string, std::shared_ptr<core::Connectable>> &connectionMap) { + this->connectionMap = connectionMap; + } + + virtual bool Get(const std::string &key, std::string &value) { return false; } @@ -204,6 +211,7 @@ class Repository : public core::SerializableComponent { Repository &operator=(const Repository &parent) = delete; protected: + std::map<std::string, std::shared_ptr<core::Connectable>> connectionMap; // Mutex for protection std::mutex mutex_; // repository directory http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/libminifi/include/core/SerializableComponent.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/SerializableComponent.h b/libminifi/include/core/SerializableComponent.h index f7f9feb..2cf13b5 100644 --- a/libminifi/include/core/SerializableComponent.h +++ b/libminifi/include/core/SerializableComponent.h @@ -19,6 +19,7 @@ #define LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_ #include "io/Serializable.h" +#include "core/Connectable.h" #include "core/Core.h" namespace org { @@ -30,12 +31,12 @@ namespace core { /** * Represents a component that is serializable and an extension point of core Component */ -class SerializableComponent : public core::CoreComponent, public minifi::io::Serializable { +class SerializableComponent : public core::Connectable, public minifi::io::Serializable { public: SerializableComponent(const std::string name, uuid_t uuid = nullptr) - : core::CoreComponent(name, uuid) { + : core::Connectable(name, uuid) { } @@ -76,6 +77,25 @@ class SerializableComponent : public core::CoreComponent, public minifi::io::Ser return false; } + virtual void yield() { + + } + + /** + * Determines if we are connected and operating + */ + virtual bool isRunning() { + return true; + } + + /** + * Determines if work is available by this connectable + * @return boolean if work is available. + */ + virtual bool isWorkAvailable() { + return true; + } + }; } /* namespace core */
