MINIFI-227: Initial Site to Site Provenance Reporting Task implementation. This closes #74.
Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/f3f8f531 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/f3f8f531 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/f3f8f531 Branch: refs/heads/master Commit: f3f8f5319037f56b267927e2e43ca9bee8040505 Parents: a625422 Author: Bin Qiu <[email protected]> Authored: Fri Mar 31 16:04:56 2017 -0700 Committer: Aldrin Piri <[email protected]> Committed: Tue Apr 18 14:45:58 2017 -0400 ---------------------------------------------------------------------- CMakeLists.txt | 4 +- LICENSE | 53 +- README.md | 13 +- libminifi/CMakeLists.txt | 3 +- libminifi/include/RemoteProcessorGroupPort.h | 1 + libminifi/include/Site2SiteClientProtocol.h | 8 +- libminifi/include/core/FlowConfiguration.h | 3 + libminifi/include/core/Processor.h | 13 +- .../SiteToSiteProvenanceReportingTask.h | 122 + libminifi/include/core/yaml/YamlConfiguration.h | 4 + libminifi/include/provenance/Provenance.h | 1 + .../include/provenance/ProvenanceRepository.h | 31 + libminifi/src/RemoteProcessorGroupPort.cpp | 62 +- libminifi/src/Site2SiteClientProtocol.cpp | 236 +- libminifi/src/core/FlowConfiguration.cpp | 11 + libminifi/src/core/Processor.cpp | 51 + .../SiteToSiteProvenanceReportingTask.cpp | 161 ++ libminifi/src/core/yaml/YamlConfiguration.cpp | 67 + libminifi/src/provenance/Provenance.cpp | 5 + libminifi/test/unit/ProcessorTests.cpp | 25 +- libminifi/test/unit/ProvenanceTestHelper.h | 22 +- main/CMakeLists.txt | 6 +- thirdparty/jsoncpp/AUTHORS | 1 + thirdparty/jsoncpp/CMakeLists.txt | 156 ++ thirdparty/jsoncpp/LICENSE | 55 + thirdparty/jsoncpp/NEWS.txt | 175 ++ thirdparty/jsoncpp/README.md | 225 ++ thirdparty/jsoncpp/SConstruct | 248 ++ thirdparty/jsoncpp/amalgamate.py | 155 ++ thirdparty/jsoncpp/appveyor.yml | 35 + thirdparty/jsoncpp/dev.makefile | 35 + thirdparty/jsoncpp/doc/doxyfile.in | 2301 ++++++++++++++++ thirdparty/jsoncpp/doc/footer.html | 3 + thirdparty/jsoncpp/doc/header.html | 24 + thirdparty/jsoncpp/doc/jsoncpp.dox | 164 ++ thirdparty/jsoncpp/doc/readme.txt | 1 + thirdparty/jsoncpp/doc/roadmap.dox | 3 + thirdparty/jsoncpp/doc/web_doxyfile.in | 2301 ++++++++++++++++ thirdparty/jsoncpp/doxybuild.py | 189 ++ thirdparty/jsoncpp/include/CMakeLists.txt | 2 + thirdparty/jsoncpp/include/json/allocator.h | 98 + thirdparty/jsoncpp/include/json/assertions.h | 54 + thirdparty/jsoncpp/include/json/autolink.h | 25 + thirdparty/jsoncpp/include/json/config.h | 184 ++ thirdparty/jsoncpp/include/json/features.h | 61 + thirdparty/jsoncpp/include/json/forwards.h | 37 + thirdparty/jsoncpp/include/json/json.h | 15 + thirdparty/jsoncpp/include/json/reader.h | 408 +++ thirdparty/jsoncpp/include/json/value.h | 870 ++++++ thirdparty/jsoncpp/include/json/version.h | 20 + thirdparty/jsoncpp/include/json/writer.h | 335 +++ .../jsoncpp/makefiles/msvc2010/jsoncpp.sln | 42 + .../jsoncpp/makefiles/msvc2010/jsontest.vcxproj | 96 + .../makefiles/msvc2010/jsontest.vcxproj.filters | 13 + .../jsoncpp/makefiles/msvc2010/lib_json.vcxproj | 143 + .../makefiles/msvc2010/lib_json.vcxproj.filters | 33 + .../makefiles/msvc2010/test_lib_json.vcxproj | 109 + .../msvc2010/test_lib_json.vcxproj.filters | 24 + thirdparty/jsoncpp/makefiles/vs71/jsoncpp.sln | 46 + .../jsoncpp/makefiles/vs71/jsontest.vcproj | 119 + .../jsoncpp/makefiles/vs71/lib_json.vcproj | 205 ++ .../jsoncpp/makefiles/vs71/test_lib_json.vcproj | 130 + thirdparty/jsoncpp/makerelease.py | 390 +++ thirdparty/jsoncpp/pkg-config/jsoncpp.pc.in | 9 + thirdparty/jsoncpp/scons-tools/globtool.py | 58 + thirdparty/jsoncpp/scons-tools/srcdist.py | 183 ++ thirdparty/jsoncpp/scons-tools/substinfile.py | 85 + thirdparty/jsoncpp/scons-tools/targz.py | 87 + thirdparty/jsoncpp/src/CMakeLists.txt | 5 + .../jsoncpp/src/jsontestrunner/CMakeLists.txt | 25 + thirdparty/jsoncpp/src/jsontestrunner/main.cpp | 326 +++ .../jsoncpp/src/jsontestrunner/sconscript | 9 + thirdparty/jsoncpp/src/lib_json/CMakeLists.txt | 113 + thirdparty/jsoncpp/src/lib_json/json_reader.cpp | 2036 ++++++++++++++ thirdparty/jsoncpp/src/lib_json/json_tool.h | 117 + thirdparty/jsoncpp/src/lib_json/json_value.cpp | 1617 +++++++++++ .../jsoncpp/src/lib_json/json_valueiterator.inl | 167 ++ thirdparty/jsoncpp/src/lib_json/json_writer.cpp | 1224 +++++++++ thirdparty/jsoncpp/src/lib_json/sconscript | 8 + thirdparty/jsoncpp/src/lib_json/version.h.in | 20 + .../jsoncpp/src/test_lib_json/CMakeLists.txt | 38 + .../jsoncpp/src/test_lib_json/jsontest.cpp | 457 ++++ thirdparty/jsoncpp/src/test_lib_json/jsontest.h | 286 ++ thirdparty/jsoncpp/src/test_lib_json/main.cpp | 2589 ++++++++++++++++++ thirdparty/jsoncpp/src/test_lib_json/sconscript | 10 + thirdparty/jsoncpp/travis.sh | 31 + thirdparty/jsoncpp/version | 1 + thirdparty/jsoncpp/version.in | 1 + 88 files changed, 19816 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 40472cb..197ea98 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,6 +81,7 @@ find_package(UUID REQUIRED) file(GLOB SPD_SOURCES "include/spdlog/*") add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-0.5.3) +add_subdirectory(thirdparty/jsoncpp) set(CIVETWEB_ENABLE_CXX ON CACHE BOOL "Enable civet C++ library") add_subdirectory(thirdparty/civetweb-1.9.1) add_subdirectory(libminifi) @@ -125,6 +126,7 @@ enable_testing(test) add_executable(tests ${LIBMINIFI_TEST_SOURCES} ${SPD_SOURCES}) target_include_directories(tests PRIVATE BEFORE "thirdparty/catch") target_include_directories(tests PRIVATE BEFORE "thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") + target_include_directories(tests PRIVATE BEFORE "thirdparty/jsoncpp/include") target_include_directories(tests PRIVATE BEFORE ${LEVELDB_INCLUDE_DIRS}) target_include_directories(tests PRIVATE BEFORE "include") target_include_directories(tests PRIVATE BEFORE "libminifi/include/") @@ -134,7 +136,7 @@ enable_testing(test) target_include_directories(tests PRIVATE BEFORE "libminifi/include/utils") target_include_directories(tests PRIVATE BEFORE "libminifi/include/processors") target_include_directories(tests PRIVATE BEFORE "libminifi/include/provenance") - target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp) + target_link_libraries(tests ${CMAKE_THREAD_LIBS_INIT} ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES} minifi yaml-cpp c-library civetweb-cpp jsoncpp_lib_static) add_test(NAME LibMinifiTests COMMAND tests) file(GLOB LIBMINIFI_TEST_EXECUTE_PROCESS "libminifi/test/TestExecuteProcess.cpp") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/LICENSE ---------------------------------------------------------------------- diff --git a/LICENSE b/LICENSE index 8a0f32e..0439ead 100644 --- a/LICENSE +++ b/LICENSE @@ -508,4 +508,55 @@ The source is available under a 3-Clause BSD License. LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF NOT ADVISED OF THE POSSIBILITY OF SUCH - DAMAGE. \ No newline at end of file + DAMAGE. + +This product bundles 'JsonCpp' which is available under a MIT license. + +The JsonCpp library's source code, including accompanying documentation, +tests and demonstration applications, are licensed under the following +conditions... + +The author (Baptiste Lepilleur) explicitly disclaims copyright in all +jurisdictions which recognize such a disclaimer. In such jurisdictions, +this software is released into the Public Domain. + +In jurisdictions which do not recognize Public Domain property (e.g. Germany as of +2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur, and is +released under the terms of the MIT License (see below). + +In jurisdictions which recognize Public Domain property, the user of this +software may choose to accept it either as 1) Public Domain, 2) under the +conditions of the MIT License (see below), or 3) under the terms of dual +Public Domain/MIT License conditions described here, as they choose. + +The MIT License is about as close to Public Domain as a license can get, and is +described in clear, concise terms at: + + http://en.wikipedia.org/wiki/MIT_License + +The full text of the MIT License follows: + +======================================================================== +Copyright (c) 2007-2010 Baptiste Lepilleur + +Permission is hereby granted, free of charge, to any person +obtaining a copy of this software and associated documentation +files (the "Software"), to deal in the Software without +restriction, including without limitation the rights to use, copy, +modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +======================================================================== +(END LICENSE TEXT) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 484c97a..be2a400 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ Perspectives of the role of MiNiFi should be from the perspective of the agent a #### Utilities * CMake - * 2.8 or greater + * 3.1 or greater * gcc * 4.8.4 or greater * g++ @@ -286,6 +286,17 @@ Additionally, users can utilize the MiNiFi Toolkit Converter (version 0.0.1 - sc if you do not want to enable client certificate base authorization nifi.security.need.ClientAuth=false +### Provenance Report + + Add Provenance Reporting to config.yml + Provenance Reporting: + scheduling strategy: TIMER_DRIVEN + scheduling period: 1 sec + port: 10001 + host: localhost + port uuid: 471deef6-2a6e-4a7d-912a-81cc17e3a204 + batch size: 100 + ### Running After completing a [build](#building), the application can be run by issuing the following from : http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 4c71cc1..5de0a87 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -58,9 +58,10 @@ endif() include_directories(../include) include_directories(../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include) include_directories(../thirdparty/civetweb-1.9.1/include) +include_directories(../thirdparty/jsoncpp/include) include_directories(include) -file(GLOB SOURCES "src/core/logging/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/*.cpp") +file(GLOB SOURCES "src/core/logging/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/processors/*.cpp" "src/*.cpp") file(GLOB SPD_SOURCES "../include/spdlog/*") # Workaround the limitations of having a http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/include/RemoteProcessorGroupPort.h ---------------------------------------------------------------------- diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index 8667519..f8aac38 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -55,6 +55,7 @@ class RemoteProcessorGroupPort : public core::Processor { // Supported Properties static core::Property hostName; static core::Property port; + static core::Property portUUID; // Supported Relationships static core::Relationship relation; public: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/include/Site2SiteClientProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/Site2SiteClientProtocol.h b/libminifi/include/Site2SiteClientProtocol.h index 78673d8..b59c885 100644 --- a/libminifi/include/Site2SiteClientProtocol.h +++ b/libminifi/include/Site2SiteClientProtocol.h @@ -376,7 +376,8 @@ class Transaction { class DataPacket { public: DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction, - std::map<std::string, std::string> attributes) { + std::map<std::string, std::string> attributes, std::string &payload) : + payload_ (payload) { _protocol = protocol; _size = 0; _transaction = transaction; @@ -386,6 +387,8 @@ class DataPacket { uint64_t _size; Site2SiteClientProtocol *_protocol; Transaction *_transaction; + std::string & payload_; + }; // Site2SiteClientProtocol Class @@ -534,6 +537,9 @@ class Site2SiteClientProtocol { // Transfer flow files for the process session void transferFlowFiles(core::ProcessContext *context, core::ProcessSession *session); + //! Transfer string for the process session + void transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload, + std::map<std::string, std::string> attributes); // deleteTransaction void deleteTransaction(std::string transactionID); // Nest Callback Class for write stream http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/include/core/FlowConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index de8ceb4..c8cb7eb 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -22,6 +22,7 @@ #include "Connection.h" #include "RemoteProcessorGroupPort.h" #include "provenance/Provenance.h" +#include "core/reporting/SiteToSiteProvenanceReportingTask.h" #include "processors/GetFile.h" #include "processors/PutFile.h" #include "processors/TailFile.h" @@ -77,6 +78,8 @@ class FlowConfiguration : public CoreComponent { // Create Connection std::shared_ptr<minifi::Connection> createConnection(std::string name, uuid_t uuid); + // Create Provenance Report Task + std::shared_ptr<core::Processor> createProvenanceReportTask(void); /** * Returns the configuration path string http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/include/core/Processor.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 2b540ec..c00d4ce 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -42,6 +42,9 @@ #include "ProcessSessionFactory.h" #include "Scheduling.h" +#include <stack> +#include "Site2SiteClientProtocol.h" + namespace org { namespace apache { namespace nifi { @@ -239,13 +242,21 @@ class Processor : public Connectable, public ConfigurableComponent, // Trigger the Processor even if the incoming connection is empty std::atomic<bool> _triggerWhenEmpty; - private: +//! obtainSite2SiteProtocol for use + std::shared_ptr<Site2SiteClientProtocol> obtainSite2SiteProtocol(std::string host, uint16_t sport, uuid_t portId); + //! returnSite2SiteProtocol after use + void returnSite2SiteProtocol(std::shared_ptr<Site2SiteClientProtocol> protocol);private: // Mutex for protection std::mutex mutex_; // Yield Expiration std::atomic<uint64_t> yield_expiration_; + // Site2Site Protocols + std::stack<std::shared_ptr<Site2SiteClientProtocol>> available_protocols_; + std::atomic<bool> protocols_created_; + + // Check all incoming connections for work bool isWorkAvailable(); // Prevent default copy constructor and assignment operation http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h new file mode 100644 index 0000000..927a8ac --- /dev/null +++ b/libminifi/include/core/reporting/SiteToSiteProvenanceReportingTask.h @@ -0,0 +1,122 @@ +/** + * @file SiteToSiteProvenanceReportingTask.h + * SiteToSiteProvenanceReportingTask class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __SITE_TO_SITE_PROVENANCE_REPORTING_TASK_H__ +#define __SITE_TO_SITE_PROVENANCE_REPORTING_TASK_H__ + +#include <mutex> +#include <memory> +#include <stack> +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "Site2SiteClientProtocol.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace reporting { + +//! SiteToSiteProvenanceReportingTask Class +class SiteToSiteProvenanceReportingTask: public core::Processor { +public: + //! Constructor + /*! + * Create a new processor + */ + SiteToSiteProvenanceReportingTask() : + core::Processor(ReportTaskName) { + logger_ = logging::Logger::getLogger(); + this->setTriggerWhenEmpty(true); + port_ = 0; + batch_size_ = 100; + } + //! Destructor + virtual ~SiteToSiteProvenanceReportingTask() { + + } + //! Report Task Name + static constexpr char const* ReportTaskName = "SiteToSiteProvenanceReportingTask"; + static const char *ProvenanceAppStr; + +public: + //! Get provenance json report + void getJsonReport(core::ProcessContext *context, + core::ProcessSession *session, std::vector < std::shared_ptr < provenance::ProvenanceEventRecord >> &records, + std::string &report); + //! OnTrigger method, implemented by NiFi SiteToSiteProvenanceReportingTask + virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + //! Initialize, over write by NiFi SiteToSiteProvenanceReportingTask + virtual void initialize(void); + //! Set Port UUID + void setPortUUID(uuid_t port_uuid) { + uuid_copy(port_uuid_, port_uuid); + } + //! Set Host + void setHost(std::string host) { + host_ = host; + } + //! Set Port + void setPort(uint16_t port) { + port_ = port; + } + //! Set Batch Size + void setBatchSize(int size) { + batch_size_ = size; + } + //! Get Host + std::string getHost(void) { + return (host_); + } + //! Get Port + uint16_t getPort(void) { + return (port_); + } + //! Get Batch Size + int getBatchSize(void) { + return (batch_size_); + } + //! Get Port UUID + void getPortUUID(uuid_t port_uuid) { + uuid_copy(port_uuid, port_uuid_); + } + +protected: + +private: + uuid_t port_uuid_; + std::string host_; + uint16_t port_; + int batch_size_; + //! Logger + std::shared_ptr<logging::Logger> logger_; +}; + +// SiteToSiteProvenanceReportingTask + +} /* namespace reporting */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/include/core/yaml/YamlConfiguration.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h index 0ca9190..319f33a 100644 --- a/libminifi/include/core/yaml/YamlConfiguration.h +++ b/libminifi/include/core/yaml/YamlConfiguration.h @@ -58,12 +58,14 @@ class YamlConfiguration : public FlowConfiguration { YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY]; YAML::Node connectionsNode = flow["Connections"]; YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"]; + YAML::Node provenanceReportNode = flow["Provenance Reporting"]; // Create the root process group core::ProcessGroup * root = parseRootProcessGroupYaml(flowControllerNode); parseProcessorNodeYaml(processorsNode, root); parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, root); parseConnectionYaml(&connectionsNode, root); + parseProvenanceReportingYaml(&provenanceReportNode, root); return std::unique_ptr<core::ProcessGroup>(root); @@ -85,6 +87,8 @@ class YamlConfiguration : public FlowConfiguration { // Process Remote Process Group YAML void parseRemoteProcessGroupYaml(YAML::Node *node, core::ProcessGroup * parent); + // Process Provenance Report YAML + void parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup * parentGroup); // Parse Properties Node YAML for a processor void parsePropertiesNodeYaml(YAML::Node *propertiesNode, std::shared_ptr<core::Processor> processor); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/include/provenance/Provenance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h index 82754c4..2977f28 100644 --- a/libminifi/include/provenance/Provenance.h +++ b/libminifi/include/provenance/Provenance.h @@ -155,6 +155,7 @@ class ProvenanceEventRecord : */ REPLAY }; + static const char *ProvenanceEventTypeStr[REPLAY+1]; public: // Constructor /*! http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/include/provenance/ProvenanceRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/ProvenanceRepository.h b/libminifi/include/provenance/ProvenanceRepository.h index 2b71fd9..b96021c 100644 --- a/libminifi/include/provenance/ProvenanceRepository.h +++ b/libminifi/include/provenance/ProvenanceRepository.h @@ -113,6 +113,9 @@ class ProvenanceRepository : public core::Repository, // Put virtual bool Put(std::string key, uint8_t *buf, int bufLen) { + if (repo_full_) + return false; + // persistent to the DB leveldb::Slice value((const char *) buf, bufLen); leveldb::Status status; @@ -149,6 +152,34 @@ class ProvenanceRepository : public core::Repository, void removeEvent(ProvenanceEventRecord *event) { Delete(event->getEventId()); } + //! get record + void getProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records, int maxSize) + { + std::lock_guard<std::mutex> lock(mutex_); + leveldb::Iterator* it = db_->NewIterator( + leveldb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>(); + std::string key = it->key().ToString(); + if (records.size() >= maxSize) + break; + if (eventRead->DeSerialize((uint8_t *) it->value().data(), + (int) it->value().size())) + { + records.push_back(eventRead); + } + } + delete it; + } + //! purge record + void purgeProvenanceRecord(std::vector<std::shared_ptr<ProvenanceEventRecord>> &records) + { + std::lock_guard<std::mutex> lock(mutex_); + for (auto record : records) + { + Delete(record->getEventId()); + } + } // destroy void destroy() { if (db_) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 33f0cb2..6263359 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -47,6 +47,8 @@ core::Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", "localhost"); core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999"); +core::Property RemoteProcessorGroupPort::portUUID("Port UUID", + "Specifies remote NiFi Port UUID.", ""); core::Relationship RemoteProcessorGroupPort::relation; std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol() { @@ -71,6 +73,7 @@ void RemoteProcessorGroupPort::initialize() { std::set<core::Property> properties; properties.insert(hostName); properties.insert(port); + properties.insert(portUUID); setSupportedProperties(properties); // Set the supported relationships std::set<core::Relationship> relationships; @@ -79,50 +82,45 @@ void RemoteProcessorGroupPort::initialize() { } void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { - std::string value; - + core::ProcessSession *session) { if (!transmitting_) return; - std::unique_ptr<Site2SiteClientProtocol> protocol_ = getNextProtocol(); + std::string value; - // Peer Connection - if (protocol_ == nullptr) { - protocol_ = std::unique_ptr<Site2SiteClientProtocol>( - new Site2SiteClientProtocol(0)); - protocol_->setPortId(protocol_uuid_); - protocol_->setTimeOut(timeout_); + int64_t lvalue; std::string host = ""; uint16_t sport = 0; - int64_t lvalue; - - if (context->getProperty(hostName.getName(), value)) { - host = value; - } - if (context->getProperty(port.getName(), value) - && core::Property::StringToInt(value, lvalue)) { - sport = (uint16_t) lvalue; - } - std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = - std::unique_ptr<org::apache::nifi::minifi::io::DataStream>( - org::apache::nifi::minifi::io::StreamFactory::getInstance() - ->createSocket(host, sport)); - - std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>( - new Site2SitePeer(std::move(str), host, sport)); - - protocol_->setPeer(std::move(peer_)); + + + if (context->getProperty(hostName.getName(), value)) { + host = value; + } + if (context->getProperty(port.getName(), value) + && core::Property::StringToInt(value, lvalue)) { + sport = (uint16_t) lvalue; + } + if (context->getProperty(portUUID.getName(), value)) { + uuid_parse(value.c_str(), protocol_uuid_); + } + + std::shared_ptr<Site2SiteClientProtocol> protocol_ = + this->obtainSite2SiteProtocol(host, sport, protocol_uuid_); + + if (!protocol_) { + context->yield(); + return; } if (!protocol_->bootstrap()) { // bootstrap the client protocol if needeed context->yield(); - std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>( - context->getProcessorNode().getProcessor()); + std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor + > (context->getProcessorNode().getProcessor()); logger_->log_error("Site2Site bootstrap failed yield period %d peer ", - processor->getYieldPeriodMsec()); + processor->getYieldPeriodMsec()); + returnSite2SiteProtocol(protocol_); return; } @@ -131,7 +129,7 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, else protocol_->transferFlowFiles(context, session); - returnProtocol(std::move(protocol_)); + returnSite2SiteProtocol(protocol_); return; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp index 52a0a02..475d49d 100644 --- a/libminifi/src/Site2SiteClientProtocol.cpp +++ b/libminifi/src/Site2SiteClientProtocol.cpp @@ -106,39 +106,39 @@ bool Site2SiteClientProtocol::initiateResourceNegotiation() { } logger_->log_info("status code is %i", statusCode); switch (statusCode) { - case RESOURCE_OK: - logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { - // tearDown(); - return false; - } - logger_->log_info( - "Site2Site Server Response asked for a different protocol version %d", - serverVersion); - for (unsigned int i = (_currentVersionIndex + 1); - i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { - if (serverVersion >= _supportedVersion[i]) { - _currentVersion = _supportedVersion[i]; - _currentVersionIndex = i; - return initiateResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Negotiate protocol response ABORT"); - ret = -1; + case RESOURCE_OK: + logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { // tearDown(); return false; - default: - logger_->log_info("Negotiate protocol response unknown code %d", - statusCode); - return true; + } + logger_->log_info( + "Site2Site Server Response asked for a different protocol version %d", + serverVersion); + for (unsigned int i = (_currentVersionIndex + 1); + i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedVersion[i]) { + _currentVersion = _supportedVersion[i]; + _currentVersionIndex = i; + return initiateResourceNegotiation(); + } + } + ret = -1; + // tearDown(); + return false; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Negotiate protocol response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + logger_->log_info("Negotiate protocol response unknown code %d", + statusCode); + return true; } return true; @@ -181,38 +181,38 @@ bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() { } switch (statusCode) { - case RESOURCE_OK: - logger_->log_info("Site2Site Codec Negotiate version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { - // tearDown(); - return false; - } - logger_->log_info( - "Site2Site Server Response asked for a different codec version %d", - serverVersion); - for (unsigned int i = (_currentCodecVersionIndex + 1); - i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { - if (serverVersion >= _supportedCodecVersion[i]) { - _currentCodecVersion = _supportedCodecVersion[i]; - _currentCodecVersionIndex = i; - return initiateCodecResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Codec Negotiate response ABORT"); - ret = -1; + case RESOURCE_OK: + logger_->log_info("Site2Site Codec Negotiate version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { // tearDown(); return false; - default: - logger_->log_info("Negotiate Codec response unknown code %d", statusCode); - return true; + } + logger_->log_info( + "Site2Site Server Response asked for a different codec version %d", + serverVersion); + for (unsigned int i = (_currentCodecVersionIndex + 1); + i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedCodecVersion[i]) { + _currentCodecVersion = _supportedCodecVersion[i]; + _currentCodecVersionIndex = i; + return initiateCodecResourceNegotiation(); + } + } + ret = -1; + // tearDown(); + return false; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Codec Negotiate response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + logger_->log_info("Negotiate Codec response unknown code %d", statusCode); + return true; } return true; @@ -755,18 +755,34 @@ bool Site2SiteClientProtocol::send(std::string transactionID, itAttribute->second.c_str()); } - uint64_t len = flowFile->getSize(); - ret = transaction->getStream().write(len); - if (ret != 8) { - return false; - } + uint64_t len = 0; + if (flowFile) { + len = flowFile->getSize(); + ret = transaction->getStream().write(len); + if (ret != 8) { + return false; + } + if (flowFile->getSize()) { + Site2SiteClientProtocol::ReadCallback callback(packet); + session->read(flowFile, &callback); + if (flowFile->getSize() != packet->_size) { + return false; + } + } + } else if (packet->payload_.length() > 0) { + len = packet->payload_.length(); - if (flowFile->getSize()) { - Site2SiteClientProtocol::ReadCallback callback(packet); - session->read(flowFile, &callback); - if (flowFile->getSize() != packet->_size) { + ret = transaction->getStream().write(len); + if (ret != 8) { return false; } + + ret = transaction->getStream().writeData( + reinterpret_cast<uint8_t *> (const_cast<char*> (packet->payload_.c_str())), len); + if (ret != len) { + return false; + } + packet->_size += len; } transaction->_transfers++; @@ -812,7 +828,8 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, while (true) { std::map<std::string, std::string> empty; uint64_t startTime = getTimeMillis(); - DataPacket packet(this, transaction, empty); + std::string payload; + DataPacket packet(this, transaction, empty, payload); bool eof = false; if (!receive(transactionID, &packet, eof)) { @@ -1190,7 +1207,8 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, try { while (continueTransaction) { uint64_t startTime = getTimeMillis(); - DataPacket packet(this, transaction, flow->getAttributes()); + std::string payload; + DataPacket packet(this, transaction, flow->getAttributes(), payload); if (!send(transactionID, &packet, flow, session)) { throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); @@ -1250,6 +1268,80 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, return; } +void Site2SiteClientProtocol::transferString(core::ProcessContext *context, + core::ProcessSession *session, std::string &payload, + std::map<std::string, std::string> attributes) { + Transaction *transaction = NULL; + + if (payload.length() <= 0) + return; + + if (_peerState != READY) { + bootstrap(); + } + + if (_peerState != READY) { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, + "Can not establish handshake with peer"); + return; + } + + // Create the transaction + std::string transactionID; + transaction = createTransaction(transactionID, SEND); + + if (transaction == NULL) { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); + return; + } + + try { + DataPacket packet(this, transaction, attributes, payload); + + if (!send(transactionID, &packet, nullptr, session)) { + throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); + return; + } + logger_->log_info("Site2Site transaction %s send bytes length %d", + transactionID.c_str(), payload.length()); + + if (!confirm(transactionID)) { + throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); + return; + } + if (!complete(transactionID)) { + throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); + return; + } + logger_->log_info( + "Site2Site transaction %s successfully send flow record %d, content bytes %d", + transactionID.c_str(), transaction->_transfers, transaction->_bytes); + } catch (std::exception &exception) { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + logger_->log_debug( + "Caught Exception during Site2SiteClientProtocol::transferBytes"); + throw; + } + + deleteTransaction(transactionID); + + return; +} + } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 68aaf5c..d2df002 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -82,6 +82,17 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor( return processor; } +std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask() { + std::shared_ptr<core::Processor> processor = nullptr; + + processor = std::make_shared< + org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(); + // initialize the processor + processor->initialize(); + + return processor; +} + std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup( std::string name, uuid_t uuid) { return std::unique_ptr<core::ProcessGroup>( http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index 9a0898a..4b14775 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -29,11 +29,13 @@ #include <thread> #include <memory> #include <functional> +#include <utility> #include "Connection.h" #include "core/Connectable.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" #include "core/ProcessSessionFactory.h" +#include "../include/io/StreamFactory.h" namespace org { namespace apache { @@ -50,6 +52,7 @@ Processor::Processor(std::string name, uuid_t uuid) strategy_ = TIMER_DRIVEN; loss_tolerant_ = false; _triggerWhenEmpty = false; + protocols_created_ = false; scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS; run_durantion_nano_ = 0; yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000; @@ -191,6 +194,54 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { } } +std::shared_ptr<Site2SiteClientProtocol> Processor::obtainSite2SiteProtocol( + std::string host, uint16_t sport, uuid_t portId) { + std::lock_guard < std::mutex > lock(mutex_); + + if (!protocols_created_) { + for (int i = 0; i < this->max_concurrent_tasks_; i++) { + // create the protocol pool based on max threads allowed + std::shared_ptr<Site2SiteClientProtocol> protocol = std::make_shared<Site2SiteClientProtocol>(nullptr); + protocols_created_ = true; + protocol->setPortId(portId); + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = + std::unique_ptr < org::apache::nifi::minifi::io::DataStream + > (org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket( + host, sport)); + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer + > (new Site2SitePeer(std::move(str), host, sport)); + protocol->setPeer(std::move(peer_)); + available_protocols_.push(protocol); + } + } + if (!available_protocols_.empty()) { + std::shared_ptr<Site2SiteClientProtocol> return_pointer = + available_protocols_.top(); + available_protocols_.pop(); + return return_pointer; + } else { + // create the protocol on demand if we exceed the pool + std::shared_ptr<Site2SiteClientProtocol> protocol = std::make_shared<Site2SiteClientProtocol>(nullptr); + protocol->setPortId(portId); + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = + std::unique_ptr < org::apache::nifi::minifi::io::DataStream + > (org::apache::nifi::minifi::io::StreamFactory::getInstance()->createSocket( + host, sport)); + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr < Site2SitePeer + > (new Site2SitePeer(std::move(str), host, sport)); + protocol->setPeer(std::move(peer_)); + return protocol; + } +} + +void Processor::returnSite2SiteProtocol( + std::shared_ptr<Site2SiteClientProtocol> protocol) { + std::lock_guard < std::mutex > lock(mutex_); + if (protocol && available_protocols_.size() < max_concurrent_tasks_) { + available_protocols_.push(protocol); + } +} + bool Processor::flowFilesQueued() { std::lock_guard<std::mutex> lock(mutex_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp new file mode 100644 index 0000000..867e200 --- /dev/null +++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp @@ -0,0 +1,161 @@ +/** + * @file SiteToSiteProvenanceReportingTask.cpp + * SiteToSiteProvenanceReportingTask class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <string> +#include <memory> +#include <sstream> +#include <iostream> + +#include "core/reporting/SiteToSiteProvenanceReportingTask.h" +#include "../include/io/StreamFactory.h" +#include "io/ClientSocket.h" +#include "utils/TimeUtil.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "provenance/Provenance.h" +#include "FlowController.h" + +#include "json/json.h" +#include "json/writer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { +namespace reporting { + +const char *SiteToSiteProvenanceReportingTask::ProvenanceAppStr = "MiNiFi Flow"; + +void SiteToSiteProvenanceReportingTask::initialize() { +} + +void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *context, + core::ProcessSession *session, + std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, + std::string &report) { + + Json::Value array; + for (auto record : records) { + Json::Value recordJson; + Json::Value updatedAttributesJson; + Json::Value parentUuidJson; + Json::Value childUuidJson; + recordJson["eventId"] = record->getEventId().c_str(); + recordJson["eventType"] = + provenance::ProvenanceEventRecord::ProvenanceEventTypeStr[record->getEventType()]; + recordJson["timestampMillis"] = record->getEventTime(); + recordJson["durationMillis"] = record->getEventDuration(); + recordJson["lineageStart"] = record->getlineageStartDate(); + recordJson["details"] = record->getDetails().c_str(); + recordJson["componentId"] = record->getComponentId().c_str(); + recordJson["componentType"] = record->getComponentType().c_str(); + recordJson["entityId"] = record->getFlowFileUuid().c_str(); + recordJson["entityType"] = "org.apache.nifi.flowfile.FlowFile"; + recordJson["entitySize"] = record->getFileSize(); + recordJson["entityOffset"] = record->getFileOffset(); + + for (auto attr : record->getAttributes()) { + updatedAttributesJson[attr.first] = attr.second; + } + recordJson["updatedAttributes"] = updatedAttributesJson; + + for (auto parentUUID : record->getParentUuids()) { + parentUuidJson.append(parentUUID.c_str()); + } + recordJson["parentIds"] = parentUuidJson; + + for (auto childUUID : record->getChildrenUuids()) { + childUuidJson.append(childUUID.c_str()); + } + recordJson["childIds"] = childUuidJson; + recordJson["transitUri"] = record->getTransitUri().c_str(); + recordJson["remoteIdentifier"] = + record->getSourceSystemFlowFileIdentifier().c_str(); + recordJson["alternateIdentifier"] = + record->getAlternateIdentifierUri().c_str(); + recordJson["application"] = ProvenanceAppStr; + array.append(recordJson); + } + + Json::StyledWriter writer; + report = writer.write(array); +} + +void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { + + std::shared_ptr<Site2SiteClientProtocol> protocol_ = + this->obtainSite2SiteProtocol(host_, port_, port_uuid_); + + if (!protocol_) { + context->yield(); + return; + } + + if (!protocol_->bootstrap()) { + // bootstrap the client protocol if needeed + context->yield(); + std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor + > (context->getProcessorNode().getProcessor()); + logger_->log_error("Site2Site bootstrap failed yield period %d peer ", + processor->getYieldPeriodMsec()); + returnSite2SiteProtocol(protocol_); + return; + } + + std::vector < std::shared_ptr < provenance::ProvenanceEventRecord >> records; + std::shared_ptr<provenance::ProvenanceRepository> repo = std::static_pointer_cast + < provenance::ProvenanceRepository > (context->getProvenanceRepository()); + repo->getProvenanceRecord(records, batch_size_); + if (records.size() <= 0) { + returnSite2SiteProtocol(protocol_); + return; + } + + std::string jsonStr; + this->getJsonReport(context, session, records, jsonStr); + if (jsonStr.length() <= 0) { + returnSite2SiteProtocol(protocol_); + return; + } + + try { + std::map < std::string, std::string > attributes; + protocol_->transferString(context, session, jsonStr, attributes); + } catch (...) { + // if transfer bytes failed, return instead of purge the provenance records + return; + } + + // we transfer the record, purge the record from DB + repo->purgeProvenanceRecord(records); + returnSite2SiteProtocol(protocol_); +} + +} /* namespace reporting */ +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 4e736f8..d76b9f3 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -329,6 +329,73 @@ void YamlConfiguration::parseRemoteProcessGroupYaml( } } +void YamlConfiguration::parseProvenanceReportingYaml( + YAML::Node *reportNode, core::ProcessGroup * parentGroup) { + uuid_t port_uuid; + int64_t schedulingPeriod = -1; + + if (!parentGroup) { + logger_->log_error("parseProvenanceReportingYaml: no parent group exists"); + return; + } + + if (!reportNode || !(reportNode->IsSequence())) { + logger_->log_debug("no provenance reporting task specified"); + return; + } + + std::shared_ptr<core::Processor> processor = nullptr; + processor = createProvenanceReportTask(); + std::shared_ptr<core::reporting::SiteToSiteProvenanceReportingTask> reportTask = + std::static_pointer_cast < core::reporting::SiteToSiteProvenanceReportingTask + > (processor); + + YAML::Node node = reportNode->as<YAML::Node>(); + + auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>(); + auto schedulingPeriodStr = node["scheduling period"].as<std::string>(); + auto hostStr = node["host"].as<std::string>(); + auto portStr = node["port"].as<std::string>(); + auto portUUIDStr = node["port uuid"].as<std::string>(); + auto batchSizeStr = node["batch size"].as<std::string>(); + + // add processor to parent + parentGroup->addProcessor(processor); + processor->setScheduledState(core::RUNNING); + + core::TimeUnit unit; + if (core::Property::StringToTime(schedulingPeriodStr, + schedulingPeriod, unit) && core::Property::ConvertTimeUnitToNS(schedulingPeriod, unit, + schedulingPeriod)) { + logger_->log_debug( + "ProvenanceReportingTask schedulingPeriod %d ns", + schedulingPeriod); + processor->setSchedulingPeriodNano(schedulingPeriod); + } + + if (schedulingStrategyStr == "TIMER_DRIVEN") { + processor->setSchedulingStrategy(core::TIMER_DRIVEN); + logger_->log_debug("ProvenanceReportingTask scheduling strategy %s", schedulingStrategyStr.c_str()); + } else { + throw std::invalid_argument( + "Invalid scheduling strategy " + schedulingStrategyStr); + } + + reportTask->setHost(hostStr); + logger_->log_debug("ProvenanceReportingTask host %s", hostStr.c_str()); + int64_t lvalue; + if (core::Property::StringToInt(portStr, lvalue)) { + logger_->log_debug("ProvenanceReportingTask port %d", (uint16_t) lvalue); + reportTask->setPort((uint16_t) lvalue); + } + logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr.c_str()); + uuid_parse(portUUIDStr.c_str(), port_uuid); + reportTask->setPortUUID(port_uuid); + if (core::Property::StringToInt(batchSizeStr, lvalue)) { + reportTask->setBatchSize(lvalue); + } +} + void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::ProcessGroup *parent) { uuid_t uuid; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/src/provenance/Provenance.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index b1db9a8..083d0b2 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -34,6 +34,11 @@ namespace nifi { namespace minifi { namespace provenance { +const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY+1] = +{ "CREATE", "RECEIVE", "FETCH", "SEND", "DOWNLOAD", "DROP", "EXPIRE", "FORK", + "JOIN", "CLONE", "CONTENT_MODIFIED", "ATTRIBUTES_MODIFIED", "ROUTE", + "ADDINFO", "REPLAY"}; + // DeSerialize bool ProvenanceEventRecord::DeSerialize( const std::shared_ptr<core::Repository> &repo, std::string key) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/test/unit/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index 87f190c..6432c48 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -44,6 +44,10 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { std::shared_ptr<core::Processor> processor = std::make_shared< org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + std::shared_ptr<core::Processor> processorReport = std::make_shared + < org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask + > (); + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); @@ -77,7 +81,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { core::ProcessContext context(node, test_repo); core::ProcessSessionFactory factory(&context); context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, - dir); + dir); core::ProcessSession session(&context); processor->onSchedule(&context, &factory); @@ -122,7 +126,7 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { for (auto entry : repo->getRepoMap()) { provenance::ProvenanceEventRecord newRecord; newRecord.DeSerialize((uint8_t*) entry.second.data(), - entry.second.length()); + entry.second.length()); bool found = false; for (auto provRec : records) { @@ -141,6 +145,23 @@ TEST_CASE("Test Find file", "[getfileCreate2]") { } + core::ProcessorNode nodeReport(processorReport); + core::ProcessContext contextReport(nodeReport, test_repo); + core::ProcessSessionFactory factoryReport(&contextReport); + core::ProcessSession sessionReport(&contextReport); + processorReport->onSchedule(&contextReport, &factoryReport); + std::shared_ptr<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport = std::static_pointer_cast + < org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask > (processorReport); + taskReport->setBatchSize(1); + std::vector < std::shared_ptr < provenance::ProvenanceEventRecord >> recordsReport; + processorReport->incrementActiveTasks(); + processorReport->setScheduledState(core::ScheduledState::RUNNING); + std::string jsonStr; + repo->getProvenanceRecord(recordsReport, 1); + taskReport->getJsonReport(&contextReport, &sessionReport, recordsReport, jsonStr); + REQUIRE(recordsReport.size() == 1); + REQUIRE(taskReport->getName() == std::string(org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName)); + REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos); } TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 80d8642..01df637 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -66,6 +66,22 @@ class TestRepository : public core::Repository { return repositoryResults; } + void getProvenanceRecord( + std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> &records, + int maxSize) { + for (auto entry : repositoryResults) { + if (records.size() >= maxSize) + break; + std::shared_ptr<provenance::ProvenanceEventRecord> eventRead = + std::make_shared<provenance::ProvenanceEventRecord>(); + + if (eventRead->DeSerialize((uint8_t*) entry.second.data(), + entry.second.length())) { + records.push_back(eventRead); + } + } + } + void run() { // do nothing } @@ -75,7 +91,7 @@ class TestRepository : public core::Repository { class TestFlowController : public minifi::FlowController { - public: +public: TestFlowController(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo) : minifi::FlowController(repo, flow_file_repo, nullptr, "",true) { @@ -125,10 +141,10 @@ class TestFlowController : public minifi::FlowController { } std::shared_ptr<minifi::Connection> createConnection(std::string name, - uuid_t uuid) { + uuid_t uuid) { return 0; } - protected: +protected: void initializePaths(const std::string &adjustedFilename) { } }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/main/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt index f7bd6e3..baf4254 100644 --- a/main/CMakeLists.txt +++ b/main/CMakeLists.txt @@ -23,7 +23,7 @@ IF(POLICY CMP0048) CMAKE_POLICY(SET CMP0048 OLD) ENDIF(POLICY CMP0048) -include_directories(../include ../libminifi/include ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/leveldb-1.18/include ../thirdparty/) +include_directories(../include ../libminifi/include ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/jsoncpp/include ../thirdparty/leveldb-1.18/include ../thirdparty/) find_package(Boost REQUIRED) include_directories(${Boost_INCLUDE_DIRS}) @@ -43,8 +43,8 @@ find_package(UUID REQUIRED) find_package(OpenSSL REQUIRED) include_directories(${OPENSSL_INCLUDE_DIR}) -# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, and leveldb -target_link_libraries(minifiexe minifi yaml-cpp c-library civetweb-cpp ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES}) +# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and leveldb +target_link_libraries(minifiexe minifi yaml-cpp c-library civetweb-cpp jsoncpp_lib_static ${UUID_LIBRARIES} ${LEVELDB_LIBRARIES} ${OPENSSL_LIBRARIES}) set_target_properties(minifiexe PROPERTIES OUTPUT_NAME minifi) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/thirdparty/jsoncpp/AUTHORS ---------------------------------------------------------------------- diff --git a/thirdparty/jsoncpp/AUTHORS b/thirdparty/jsoncpp/AUTHORS new file mode 100644 index 0000000..c0fbbee --- /dev/null +++ b/thirdparty/jsoncpp/AUTHORS @@ -0,0 +1 @@ +Baptiste Lepilleur <[email protected]> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/thirdparty/jsoncpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/thirdparty/jsoncpp/CMakeLists.txt b/thirdparty/jsoncpp/CMakeLists.txt new file mode 100644 index 0000000..a6c0884 --- /dev/null +++ b/thirdparty/jsoncpp/CMakeLists.txt @@ -0,0 +1,156 @@ +# vim: et ts=4 sts=4 sw=4 tw=0 + +CMAKE_MINIMUM_REQUIRED(VERSION 3.1) +PROJECT(jsoncpp) +ENABLE_TESTING() + +OPTION(JSONCPP_WITH_TESTS "Compile and (for jsoncpp_check) run JsonCpp test executables" OFF) +OPTION(JSONCPP_WITH_POST_BUILD_UNITTEST "Automatically run unit-tests as a post build step" OFF) +OPTION(JSONCPP_WITH_WARNING_AS_ERROR "Force compilation to fail if a warning occurs" OFF) +OPTION(JSONCPP_WITH_STRICT_ISO "Issue all the warnings demanded by strict ISO C and ISO C++" ON) +OPTION(JSONCPP_WITH_PKGCONFIG_SUPPORT "Generate and install .pc files" ON) +OPTION(JSONCPP_WITH_CMAKE_PACKAGE "Generate and install cmake package files" OFF) +OPTION(BUILD_SHARED_LIBS "Build jsoncpp_lib as a shared library." OFF) +OPTION(BUILD_STATIC_LIBS "Build jsoncpp_lib static library." ON) + +# Ensures that CMAKE_BUILD_TYPE is visible in cmake-gui on Unix +IF(NOT WIN32) + IF(NOT CMAKE_BUILD_TYPE) + SET(CMAKE_BUILD_TYPE Release CACHE STRING + "Choose the type of build, options are: None Debug Release RelWithDebInfo MinSizeRel Coverage." + FORCE) + ENDIF() +ENDIF() + +# Enable runtime search path support for dynamic libraries on OSX +IF(APPLE) + SET(CMAKE_MACOSX_RPATH 1) +ENDIF() + +# Adhere to GNU filesystem layout conventions +INCLUDE(GNUInstallDirs) + +SET(DEBUG_LIBNAME_SUFFIX "" CACHE STRING "Optional suffix to append to the library name for a debug build") + +# Set variable named ${VAR_NAME} to value ${VALUE} +FUNCTION(set_using_dynamic_name VAR_NAME VALUE) + SET( "${VAR_NAME}" "${VALUE}" PARENT_SCOPE) +ENDFUNCTION() + +# Extract major, minor, patch from version text +# Parse a version string "X.Y.Z" and outputs +# version parts in ${OUPUT_PREFIX}_MAJOR, _MINOR, _PATCH. +# If parse succeeds then ${OUPUT_PREFIX}_FOUND is TRUE. +MACRO(jsoncpp_parse_version VERSION_TEXT OUPUT_PREFIX) + SET(VERSION_REGEX "[0-9]+\\.[0-9]+\\.[0-9]+(-[a-zA-Z0-9_]+)?") + IF( ${VERSION_TEXT} MATCHES ${VERSION_REGEX} ) + STRING(REGEX MATCHALL "[0-9]+|-([A-Za-z0-9_]+)" VERSION_PARTS ${VERSION_TEXT}) + LIST(GET VERSION_PARTS 0 ${OUPUT_PREFIX}_MAJOR) + LIST(GET VERSION_PARTS 1 ${OUPUT_PREFIX}_MINOR) + LIST(GET VERSION_PARTS 2 ${OUPUT_PREFIX}_PATCH) + set_using_dynamic_name( "${OUPUT_PREFIX}_FOUND" TRUE ) + ELSE( ${VERSION_TEXT} MATCHES ${VERSION_REGEX} ) + set_using_dynamic_name( "${OUPUT_PREFIX}_FOUND" FALSE ) + ENDIF() +ENDMACRO() + +# Read out version from "version" file +#FILE(STRINGS "version" JSONCPP_VERSION) +#SET( JSONCPP_VERSION_MAJOR X ) +#SET( JSONCPP_VERSION_MINOR Y ) +#SET( JSONCPP_VERSION_PATCH Z ) +SET( JSONCPP_VERSION 1.8.0 ) +jsoncpp_parse_version( ${JSONCPP_VERSION} JSONCPP_VERSION ) +#IF(NOT JSONCPP_VERSION_FOUND) +# MESSAGE(FATAL_ERROR "Failed to parse version string properly. Expect X.Y.Z") +#ENDIF(NOT JSONCPP_VERSION_FOUND) +SET( JSONCPP_SOVERSION 11 ) +SET( JSONCPP_USE_SECURE_MEMORY "0" CACHE STRING "-D...=1 to use memory-wiping allocator for STL" ) + +MESSAGE(STATUS "JsonCpp Version: ${JSONCPP_VERSION_MAJOR}.${JSONCPP_VERSION_MINOR}.${JSONCPP_VERSION_PATCH}") +# File version.h is only regenerated on CMake configure step +CONFIGURE_FILE( "${PROJECT_SOURCE_DIR}/src/lib_json/version.h.in" + "${PROJECT_SOURCE_DIR}/include/json/version.h" + NEWLINE_STYLE UNIX ) +CONFIGURE_FILE( "${PROJECT_SOURCE_DIR}/version.in" + "${PROJECT_SOURCE_DIR}/version" + NEWLINE_STYLE UNIX ) + +MACRO(UseCompilationWarningAsError) + IF(MSVC) + # Only enabled in debug because some old versions of VS STL generate + # warnings when compiled in release configuration. + SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /WX ") + ELSEIF(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror") + IF(JSONCPP_WITH_STRICT_ISO) + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pedantic-errors") + ENDIF() + ENDIF() +ENDMACRO() + +# Include our configuration header +INCLUDE_DIRECTORIES( ${jsoncpp_SOURCE_DIR}/include ) + +IF(MSVC) + # Only enabled in debug because some old versions of VS STL generate + # unreachable code warning when compiled in release configuration. + SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /W4 ") +ENDIF() + +# Require C++11 support, prefer ISO C++ over GNU variants, +# as relying solely on ISO C++ is more portable. +SET(CMAKE_CXX_STANDARD 11) +SET(CMAKE_CXX_STANDARD_REQUIRED ON) +SET(CMAKE_CXX_EXTENSIONS OFF) + +IF(CMAKE_CXX_COMPILER_ID MATCHES "Clang") + # using regular Clang or AppleClang + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wconversion -Wshadow -Werror=conversion -Werror=sign-compare") +ELSEIF(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + # using GCC + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wconversion -Wshadow -Wextra") + # not yet ready for -Wsign-conversion + + IF(JSONCPP_WITH_STRICT_ISO AND NOT JSONCPP_WITH_WARNING_AS_ERROR) + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror=conversion -pedantic") + ENDIF() +ELSEIF(CMAKE_CXX_COMPILER_ID STREQUAL "Intel") + #Â using Intel compiler + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wconversion -Wshadow -Wextra -Werror=conversion") + + IF(JSONCPP_WITH_STRICT_ISO AND NOT JSONCPP_WITH_WARNING_AS_ERROR) + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pedantic") + ENDIF() +ENDIF() + +FIND_PROGRAM(CCACHE_FOUND ccache) +IF(CCACHE_FOUND) + SET_PROPERTY(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache) + SET_PROPERTY(GLOBAL PROPERTY RULE_LAUNCH_LINK ccache) +ENDIF(CCACHE_FOUND) + +IF(JSONCPP_WITH_WARNING_AS_ERROR) + UseCompilationWarningAsError() +ENDIF() + +IF(JSONCPP_WITH_PKGCONFIG_SUPPORT) + CONFIGURE_FILE( + "pkg-config/jsoncpp.pc.in" + "pkg-config/jsoncpp.pc" + @ONLY) + INSTALL(FILES "${CMAKE_CURRENT_BINARY_DIR}/pkg-config/jsoncpp.pc" + DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig") +ENDIF() + +IF(JSONCPP_WITH_CMAKE_PACKAGE) + INSTALL(EXPORT jsoncpp + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/jsoncpp + FILE jsoncppConfig.cmake) +ENDIF() + +# Build the different applications +ADD_SUBDIRECTORY( src ) + +#install the includes +ADD_SUBDIRECTORY( include ) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/thirdparty/jsoncpp/LICENSE ---------------------------------------------------------------------- diff --git a/thirdparty/jsoncpp/LICENSE b/thirdparty/jsoncpp/LICENSE new file mode 100644 index 0000000..ca2bfe1 --- /dev/null +++ b/thirdparty/jsoncpp/LICENSE @@ -0,0 +1,55 @@ +The JsonCpp library's source code, including accompanying documentation, +tests and demonstration applications, are licensed under the following +conditions... + +The author (Baptiste Lepilleur) explicitly disclaims copyright in all +jurisdictions which recognize such a disclaimer. In such jurisdictions, +this software is released into the Public Domain. + +In jurisdictions which do not recognize Public Domain property (e.g. Germany as of +2010), this software is Copyright (c) 2007-2010 by Baptiste Lepilleur, and is +released under the terms of the MIT License (see below). + +In jurisdictions which recognize Public Domain property, the user of this +software may choose to accept it either as 1) Public Domain, 2) under the +conditions of the MIT License (see below), or 3) under the terms of dual +Public Domain/MIT License conditions described here, as they choose. + +The MIT License is about as close to Public Domain as a license can get, and is +described in clear, concise terms at: + + http://en.wikipedia.org/wiki/MIT_License + +The full text of the MIT License follows: + +======================================================================== +Copyright (c) 2007-2010 Baptiste Lepilleur + +Permission is hereby granted, free of charge, to any person +obtaining a copy of this software and associated documentation +files (the "Software"), to deal in the Software without +restriction, including without limitation the rights to use, copy, +modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +======================================================================== +(END LICENSE TEXT) + +The MIT license is compatible with both the GPL and commercial +software, affording one all of the rights of Public Domain with the +minor nuisance of being required to keep the above copyright notice +and license text in the source code. Note also that by accepting the +Public Domain "license" you can re-license your copy using whatever +license you like. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/thirdparty/jsoncpp/NEWS.txt ---------------------------------------------------------------------- diff --git a/thirdparty/jsoncpp/NEWS.txt b/thirdparty/jsoncpp/NEWS.txt new file mode 100644 index 0000000..5733fcd --- /dev/null +++ b/thirdparty/jsoncpp/NEWS.txt @@ -0,0 +1,175 @@ +New in SVN +---------- + + * Updated the type system's behavior, in order to better support backwards + compatibility with code that was written before 64-bit integer support was + introduced. Here's how it works now: + + * isInt, isInt64, isUInt, and isUInt64 return true if and only if the + value can be exactly represented as that type. In particular, a value + constructed with a double like 17.0 will now return true for all of + these methods. + + * isDouble and isFloat now return true for all numeric values, since all + numeric values can be converted to a double or float without + truncation. Note however that the conversion may not be exact -- for + example, doubles cannot exactly represent all integers above 2^53 + 1. + + * isBool, isNull, isString, isArray, and isObject now return true if and + only if the value is of that type. + + * isConvertibleTo(fooValue) indicates that it is safe to call asFoo. + (For each type foo, isFoo always implies isConvertibleTo(fooValue).) + asFoo returns an approximate or exact representation as appropriate. + For example, a double value may be truncated when asInt is called. + + * For backwards compatibility with old code, isConvertibleTo(intValue) + may return false even if type() == intValue. This is because the value + may have been constructed with a 64-bit integer larger than maxInt, + and calling asInt() would cause an exception. If you're writing new + code, use isInt64 to find out whether the value is exactly + representable using an Int64, or asDouble() combined with minInt64 and + maxInt64 to figure out whether it is approximately representable. + +* Value + - Patch #10: BOOST_FOREACH compatibility. Made Json::iterator more + standard compliant, added missing iterator_category and value_type + typedefs (contribued by Robert A. Iannucci). + +* Compilation + + - New CMake based build system. Based in part on contribution from + Igor Okulist and Damien Buhl (Patch #14). + + - New header json/version.h now contains version number macros + (JSONCPP_VERSION_MAJOR, JSONCPP_VERSION_MINOR, JSONCPP_VERSION_PATCH + and JSONCPP_VERSION_HEXA). + + - Patch #11: added missing JSON_API on some classes causing link issues + when building as a dynamic library on Windows + (contributed by Francis Bolduc). + + - Visual Studio DLL: suppressed warning "C4251: <data member>: <type> + needs to have dll-interface to be used by..." via pragma push/pop + in json-cpp headers. + + - Added Travis CI intregration: https://travis-ci.org/blep/jsoncpp-mirror + +* Bug fixes + - Patch #15: Copy constructor does not initialize allocated_ for stringValue + (contributed by rmongia). + + - Patch #16: Missing field copy in Json::Value::iterator causing infinite + loop when using experimental internal map (#define JSON_VALUE_USE_INTERNAL_MAP) + (contributed by Ming-Lin Kao). + + + New in JsonCpp 0.6.0: + --------------------- + +* Compilation + + - LD_LIBRARY_PATH and LIBRARY_PATH environment variables are now + propagated to the build environment as this is required for some + compiler installation. + + - Added support for Microsoft Visual Studio 2008 (bug #2930462): + The platform "msvc90" has been added. + + Notes: you need to setup the environment by running vcvars32.bat + (e.g. MSVC 2008 command prompt in start menu) before running scons. + + - Added support for amalgamated source and header generation (a la sqlite). + Refer to README.md section "Generating amalgamated source and header" + for detail. + +* Value + + - Removed experimental ValueAllocator, it caused static + initialization/destruction order issues (bug #2934500). + The DefaultValueAllocator has been inlined in code. + + - Added support for 64 bits integer: + + Types Json::Int64 and Json::UInt64 have been added. They are aliased + to 64 bits integers on system that support them (based on __int64 on + Microsoft Visual Studio platform, and long long on other platforms). + + Types Json::LargestInt and Json::LargestUInt have been added. They are + aliased to the largest integer type supported: + either Json::Int/Json::UInt or Json::Int64/Json::UInt64 respectively. + + Json::Value::asInt() and Json::Value::asUInt() still returns plain + "int" based types, but asserts if an attempt is made to retrieve + a 64 bits value that can not represented as the return type. + + Json::Value::asInt64() and Json::Value::asUInt64() have been added + to obtain the 64 bits integer value. + + Json::Value::asLargestInt() and Json::Value::asLargestUInt() returns + the integer as a LargestInt/LargestUInt respectively. Those functions + functions are typically used when implementing writer. + + The reader attempts to read number as 64 bits integer, and fall back + to reading a double if the number is not in the range of 64 bits + integer. + + Warning: Json::Value::asInt() and Json::Value::asUInt() now returns + long long. This changes break code that was passing the return value + to *printf() function. + + Support for 64 bits integer can be disabled by defining the macro + JSON_NO_INT64 (uncomment it in json/config.h for example), though + it should have no impact on existing usage. + + - The type Json::ArrayIndex is used for indexes of a JSON value array. It + is an unsigned int (typically 32 bits). + + - Array index can be passed as int to operator[], allowing use of literal: + Json::Value array; + array.append( 1234 ); + int value = array[0].asInt(); // did not compile previously + + - Added float Json::Value::asFloat() to obtain a floating point value as a + float (avoid lost of precision warning caused by used of asDouble() + to initialize a float). + +* Reader + + - Renamed Reader::getFormatedErrorMessages() to getFormattedErrorMessages. + Bug #3023708 (Formatted has 2 't'). The old member function is deprecated + but still present for backward compatibility. + +* Tests + + - Added test to ensure that the escape sequence "\/" is corrected handled + by the parser. + +* Bug fixes + + - Bug #3139677: JSON [1 2 3] was incorrectly parsed as [1, 3]. Error is now + correctly detected. + + - Bug #3139678: stack buffer overflow when parsing a double with a + length of 32 characters. + + - Fixed Value::operator <= implementation (had the semantic of operator >=). + Found when adding unit tests for comparison operators. + + - Value::compare() is now const and has an actual implementation with + unit tests. + + - Bug #2407932: strpbrk() can fail for NULL pointer. + + - Bug #3306345: Fixed minor typo in Path::resolve(). + + - Bug #3314841/#3306896: errors in amalgamate.py + + - Fixed some Coverity warnings and line-endings. + +* License + + - See file LICENSE for details. Basically JsonCpp is now licensed under + MIT license, or public domain if desired and recognized in your jurisdiction. + Thanks to Stephan G. Beal [http://wanderinghorse.net/home/stephan/]) who + helped figuring out the solution to the public domain issue. http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/f3f8f531/thirdparty/jsoncpp/README.md ---------------------------------------------------------------------- diff --git a/thirdparty/jsoncpp/README.md b/thirdparty/jsoncpp/README.md new file mode 100644 index 0000000..038ccd2 --- /dev/null +++ b/thirdparty/jsoncpp/README.md @@ -0,0 +1,225 @@ +# JsonCpp + +[JSON][json-org] is a lightweight data-interchange format. It can represent +numbers, strings, ordered sequences of values, and collections of name/value +pairs. + +[json-org]: http://json.org/ + +JsonCpp is a C++ library that allows manipulating JSON values, including +serialization and deserialization to and from strings. It can also preserve +existing comment in unserialization/serialization steps, making it a convenient +format to store user input files. + + +## Documentation + +[JsonCpp documentation][JsonCpp-documentation] is generated using [Doxygen][]. + +[JsonCpp-documentation]: http://open-source-parsers.github.io/jsoncpp-docs/doxygen/index.html +[Doxygen]: http://www.doxygen.org + + +## A note on backward-compatibility + +* `1.y.z` is built with C++11. +* `0.y.z` can be used with older compilers. +* Major versions maintain binary-compatibility. + + +## Using JsonCpp in your project + +The recommended approach to integrating JsonCpp in your project is to include +the [amalgamated source](#generating-amalgamated-source-and-header) (a single +`.cpp` file and two `.h` files) in your project, and compile and build as you +would any other source file. This ensures consistency of compilation flags and +ABI compatibility, issues which arise when building shared or static +libraries. See the next section for instructions. + +The `include/` should be added to your compiler include path. JsonCpp headers +should be included as follow: + + #include <json/json.h> + +If JsonCpp was built as a dynamic library on Windows, then your project needs to define the macro `JSON_DLL`. + +### Generating amalgamated source and header + +JsonCpp is provided with a script to generate a single header and a single +source file to ease inclusion into an existing project. The amalgamated source +can be generated at any time by running the following command from the +top-directory (this requires Python 2.6): + + python amalgamate.py + +It is possible to specify header name. See the `-h` option for detail. + +By default, the following files are generated: + +* `dist/jsoncpp.cpp`: source file that needs to be added to your project. +* `dist/json/json.h`: corresponding header file for use in your project. It is + equivalent to including `json/json.h` in non-amalgamated source. This header + only depends on standard headers. +* `dist/json/json-forwards.h`: header that provides forward declaration of all + JsonCpp types. + +The amalgamated sources are generated by concatenating JsonCpp source in the +correct order and defining the macro `JSON_IS_AMALGAMATION` to prevent inclusion of other headers. + + +## Contributing to JsonCpp + +### Building and testing with CMake + +[CMake][] is a C++ Makefiles/Solution generator. It is usually available on most Linux system as package. On Ubuntu: + + sudo apt-get install cmake + +[CMake]: http://www.cmake.org + +Note that Python is also required to run the JSON reader/writer tests. If +missing, the build will skip running those tests. + +When running CMake, a few parameters are required: + +* A build directory where the makefiles/solution are generated. It is also used + to store objects, libraries and executables files. +* The generator to use: makefiles or Visual Studio solution? What version or + Visual Studio, 32 or 64 bits solution? + +Steps for generating solution/makefiles using `cmake-gui`: + +* Make "source code" point to the source directory. +* Make "where to build the binary" point to the directory to use for the build. +* Click on the "Grouped" check box. +* Review JsonCpp build options (tick `BUILD_SHARED_LIBS` to build as a + dynamic library). +* Click the configure button at the bottom, then the generate button. +* The generated solution/makefiles can be found in the binary directory. + +Alternatively, from the command-line on Unix in the source directory: + + mkdir -p build/debug + cd build/debug + cmake -DCMAKE_BUILD_TYPE=debug -DBUILD_STATIC_LIBS=ON -DBUILD_SHARED_LIBS=OFF -DARCHIVE_INSTALL_DIR=. -G "Unix Makefiles" ../.. + make + +For a good pkg-config file, add: + + -DCMAKE_INSTALL_INCLUDEDIR=include/jsoncpp + +Running `cmake -h` will display the list of available generators (passed using +the `-G` option). + +By default CMake hides compilation commands. This can be modified by specifying +`-DCMAKE_VERBOSE_MAKEFILE=true` when generating makefiles. + +### Building and testing with SCons + +**Note:** The SCons-based build system is deprecated. Please use CMake (see the +section above). + +JsonCpp can use [Scons][] as a build system. Note that SCons requires Python to +be installed. + +[SCons]: http://www.scons.org/ + +Invoke SCons as follows: + + scons platform=$PLATFORM [TARGET] + +where `$PLATFORM` may be one of: + +* `suncc`: Sun C++ (Solaris) +* `vacpp`: Visual Age C++ (AIX) +* `mingw` +* `msvc6`: Microsoft Visual Studio 6 service pack 5-6 +* `msvc70`: Microsoft Visual Studio 2002 +* `msvc71`: Microsoft Visual Studio 2003 +* `msvc80`: Microsoft Visual Studio 2005 +* `msvc90`: Microsoft Visual Studio 2008 +* `linux-gcc`: Gnu C++ (linux, also reported to work for Mac OS X) + +If you are building with Microsoft Visual Studio 2008, you need to set up the +environment by running `vcvars32.bat` (e.g. MSVC 2008 command prompt) before +running SCons. + +### Running the tests manually + +You need to run tests manually only if you are troubleshooting an issue. + +In the instructions below, replace `path/to/jsontest` with the path of the +`jsontest` executable that was compiled on your platform. + + cd test + # This will run the Reader/Writer tests + python runjsontests.py path/to/jsontest + + # This will run the Reader/Writer tests, using JSONChecker test suite + # (http://www.json.org/JSON_checker/). + # Notes: not all tests pass: JsonCpp is too lenient (for example, + # it allows an integer to start with '0'). The goal is to improve + # strict mode parsing to get all tests to pass. + python runjsontests.py --with-json-checker path/to/jsontest + + # This will run the unit tests (mostly Value) + python rununittests.py path/to/test_lib_json + + # You can run the tests using valgrind: + python rununittests.py --valgrind path/to/test_lib_json + +### Running the tests using SCons + +Note that tests can be run using SCons using the `check` target: + + scons platform=$PLATFORM check + +### Building the documentation + +Run the Python script `doxybuild.py` from the top directory: + + python doxybuild.py --doxygen=$(which doxygen) --open --with-dot + +See `doxybuild.py --help` for options. + +### Adding a reader/writer test + +To add a test, you need to create two files in test/data: + +* a `TESTNAME.json` file, that contains the input document in JSON format. +* a `TESTNAME.expected` file, that contains a flatened representation of the + input document. + +The `TESTNAME.expected` file format is as follows: + +* Each line represents a JSON element of the element tree represented by the + input document. +* Each line has two parts: the path to access the element separated from the + element value by `=`. Array and object values are always empty (i.e. + represented by either `[]` or `{}`). +* Element path `.` represents the root element, and is used to separate object + members. `[N]` is used to specify the value of an array element at index `N`. + +See the examples `test_complex_01.json` and `test_complex_01.expected` to better understand element paths. + +### Understanding reader/writer test output + +When a test is run, output files are generated beside the input test files. Below is a short description of the content of each file: + +* `test_complex_01.json`: input JSON document. +* `test_complex_01.expected`: flattened JSON element tree used to check if + parsing was corrected. +* `test_complex_01.actual`: flattened JSON element tree produced by `jsontest` + from reading `test_complex_01.json`. +* `test_complex_01.rewrite`: JSON document written by `jsontest` using the + `Json::Value` parsed from `test_complex_01.json` and serialized using + `Json::StyledWritter`. +* `test_complex_01.actual-rewrite`: flattened JSON element tree produced by + `jsontest` from reading `test_complex_01.rewrite`. +* `test_complex_01.process-output`: `jsontest` output, typically useful for + understanding parsing errors. + +## License + +See the `LICENSE` file for details. In summary, JsonCpp is licensed under the +MIT license, or public domain if desired and recognized in your jurisdiction.
