Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 2981bc1d4 -> 088e1be7c
MINIFICPP-41: First iteration of C api MINIFICPP-41: Improve build process and make function names clearer This closes #217. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/088e1be7 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/088e1be7 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/088e1be7 Branch: refs/heads/master Commit: 088e1be7ca9deac9f104ebf315a88bd311e6a623 Parents: 2981bc1 Author: Marc Parisi <[email protected]> Authored: Tue Dec 5 13:57:06 2017 -0500 Committer: Marc Parisi <[email protected]> Committed: Mon Dec 18 13:55:28 2017 -0500 ---------------------------------------------------------------------- CMakeLists.txt | 3 + LibExample/CMakeLists.txt | 61 +++++ LibExample/generate_flow.c | 67 +++++ LibExample/transmit_flow.c | 92 +++++++ libminifi/CMakeLists.txt | 4 + libminifi/include/capi/Instance.h | 134 ++++++++++ libminifi/include/capi/Plan.h | 117 ++++++++ libminifi/include/capi/ReflexiveSession.h | 77 ++++++ libminifi/include/capi/api.h | 154 +++++++++++ libminifi/include/capi/expect.h | 32 +++ libminifi/include/core/ProcessContext.h | 4 +- libminifi/include/core/ProcessSession.h | 6 +- .../include/sitetosite/SiteToSiteFactory.h | 3 + libminifi/src/RemoteProcessorGroupPort.cpp | 1 + libminifi/src/capi/Plan.cpp | 216 +++++++++++++++ libminifi/src/capi/api.cpp | 264 +++++++++++++++++++ 16 files changed, 1231 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 3463d69..eabc1ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -206,6 +206,9 @@ endif() ## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN add_subdirectory(main) +if (NOT DISABLE_CURL) + add_subdirectory(LibExample) +endif() # Generate source assembly http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/LibExample/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/LibExample/CMakeLists.txt b/LibExample/CMakeLists.txt new file mode 100644 index 0000000..4ba26bd --- /dev/null +++ b/LibExample/CMakeLists.txt @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cmake_minimum_required(VERSION 2.6) + +IF(POLICY CMP0048) + CMAKE_POLICY(SET CMP0048 OLD) +ENDIF(POLICY CMP0048) + +include_directories(../libminifi/include ../libminifi/include/c2 ../libminifi/include/c2/protocols/ ../libminifi/include/core/state ./libminifi/include/core/statemanagement/metrics ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue ../thirdparty/yaml-cpp-yaml-cpp-20171024/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/jsoncpp/include ../thirdparty/) + +include(CheckCXXCompilerFlag) +CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11) +CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X) +if(COMPILER_SUPPORTS_CXX11) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Os") +elseif(COMPILER_SUPPORTS_CXX0X) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x -Os") +else() + message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.") +endif() + +add_executable(transmit_flow transmit_flow.c) + +# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and rocksdb +target_link_libraries(transmit_flow capi core-minifi minifi) + +if (APPLE) + target_link_libraries (transmit_flow -Wl,-all_load minifi-http-curl) +else () + target_link_libraries (transmit_flow -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive) +endif () + + +add_executable(generate_flow generate_flow.c) + +# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl, jsoncpp and rocksdb +target_link_libraries(generate_flow capi core-minifi minifi) + +if (APPLE) + target_link_libraries (generate_flow -Wl,-all_load minifi-http-curl) +else () + target_link_libraries (generate_flow -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive) +endif () + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/LibExample/generate_flow.c ---------------------------------------------------------------------- diff --git a/LibExample/generate_flow.c b/LibExample/generate_flow.c new file mode 100644 index 0000000..23ba979 --- /dev/null +++ b/LibExample/generate_flow.c @@ -0,0 +1,67 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <dirent.h> +#include "capi/api.h" + + +/** + * This is an example of the C API that transmits a flow file to a remote instance. + */ +int main(int argc, char **argv) { + + if (argc < 3) { + printf("Error: must run ./generate_flow <instance> <remote port> \n"); + exit(1); + } + + char *instance_str = argv[1]; + char *portStr = argv[2]; + + nifi_port port; + + port.pord_id = portStr; + + nifi_instance *instance = create_instance(instance_str, &port); + + flow *new_flow = create_flow(instance,"GenerateFlowFile"); + + flow_file_record *record = get_next_flow_file(instance, new_flow ); + + if (record == 0){ + printf("Could not create flow file"); + exit(1); + } + + transmit_flowfile(record,instance); + + free_flowfile(record); + + // initializing will make the transmission slightly more efficient. + //initialize_instance(instance); + //transfer_file_or_directory(instance,file); + + free_flow(new_flow); + + free_instance(instance); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/LibExample/transmit_flow.c ---------------------------------------------------------------------- diff --git a/LibExample/transmit_flow.c b/LibExample/transmit_flow.c new file mode 100644 index 0000000..64e2beb --- /dev/null +++ b/LibExample/transmit_flow.c @@ -0,0 +1,92 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <dirent.h> +#include "capi/api.h" + +int is_dir(const char *path) { + struct stat stat_struct; + if (stat(path, &stat_struct) != 0) + return 0; + return S_ISDIR(stat_struct.st_mode); +} + +void transfer_file_or_directory(nifi_instance *instance, char *file_or_dir) { + int size = 1; + + if (is_dir(file_or_dir)) { + DIR *d; + + struct dirent *dir; + d = opendir(file_or_dir); + if (d) { + while ((dir = readdir(d)) != NULL) { + if (!memcmp(dir->d_name,".",1) ) + continue; + char *file_path = malloc(strlen(file_or_dir) + strlen(dir->d_name) + 2); + sprintf(file_path,"%s/%s",file_or_dir,dir->d_name); + transfer_file_or_directory(instance,file_path); + free(file_path); + } + closedir(d); + } + printf("%s is a directory", file_or_dir); + } else { + printf("Transferring %s\n",file_or_dir); + + flow_file_record *record = create_flowfile(file_or_dir); + + add_attribute(record, "addedattribute", "1", 2); + + transmit_flowfile(record, instance); + + free_flowfile(record); + } +} + +/** + * This is an example of the C API that transmits a flow file to a remote instance. + */ +int main(int argc, char **argv) { + + if (argc < 4) { + printf("Error: must run ./transmit_flow <instance> <remote port> <file or directory>\n"); + exit(1); + } + + char *instance_str = argv[1]; + char *portStr = argv[2]; + char *file = argv[3]; + + nifi_port port; + + port.pord_id = portStr; + + nifi_instance *instance = create_instance(instance_str, &port); + + // initializing will make the transmission slightly more efficient. + //initialize_instance(instance); + transfer_file_or_directory(instance,file); + + free_instance(instance); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index b882a1c..6ede84f 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -64,6 +64,8 @@ file(GLOB SOURCES "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/s file(GLOB PROCESSOR_SOURCES "src/processors/*.cpp" ) +file(GLOB CAPI_SOURCES "src/capi/*.cpp" ) + file(GLOB SPD_SOURCES "../thirdparty/spdlog-20170710/include/spdlog/*") # Workaround the limitations of having a @@ -89,8 +91,10 @@ else () endif (OPENSSL_FOUND) add_library(minifi STATIC ${PROCESSOR_SOURCES}) +add_library(capi STATIC ${CAPI_SOURCES}) target_link_libraries(minifi c-library civetweb-cpp) target_link_libraries(minifi core-minifi) + SET (LIBMINIFI core-minifi PARENT_SCOPE) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/Instance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/Instance.h b/libminifi/include/capi/Instance.h new file mode 100644 index 0000000..5d1c4c1 --- /dev/null +++ b/libminifi/include/capi/Instance.h @@ -0,0 +1,134 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ +#define LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ + +#include <memory> +#include <type_traits> +#include <string> +#include "core/Property.h" +#include "properties/Configure.h" +#include "io/StreamFactory.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ContentRepository.h" +#include "core/repository/VolatileContentRepository.h" +#include "core/Repository.h" + +#include "core/Connectable.h" +#include "core/ProcessorNode.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessSessionFactory.h" +#include "core/controller/ControllerServiceProvider.h" +#include "core/FlowConfiguration.h" +#include "ReflexiveSession.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +class ProcessorLink { + public: + explicit ProcessorLink(const std::shared_ptr<core::Processor> &processor) + : processor_(processor) { + + } + + const std::shared_ptr<core::Processor> &getProcessor() { + return processor_; + } + + protected: + std::shared_ptr<core::Processor> processor_; +}; + +class Instance { + public: + + explicit Instance(const std::string &url, const std::string &port) + : configure_(std::make_shared<Configure>()), + url_(url), + rpgInitialized_(false), + content_repo_(std::make_shared<minifi::core::repository::FileSystemRepository>()), + no_op_repo_(std::make_shared<minifi::core::Repository>()) { + stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configure_); + uuid_t uuid; + uuid_parse(port.c_str(), uuid); + rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, url, url, configure_, uuid); + proc_node_ = std::make_shared<core::ProcessorNode>(rpg_); + core::FlowConfiguration::initialize_static_functions(); + content_repo_->initialize(configure_); + } + + bool isRPGConfigured() { + return rpgInitialized_; + } + + void setRemotePort(std::string remote_port) { + rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port); + rpg_->initialize(); + rpg_->setTransmitting(true); + rpgInitialized_ = true; + } + + std::shared_ptr<Configure> getConfiguration() { + return configure_; + } + + std::shared_ptr<minifi::core::Repository> getNoOpRepository() { + return no_op_repo_; + } + + std::shared_ptr<minifi::core::ContentRepository> getContentRepository() { + return content_repo_; + } + + void transfer(const std::shared_ptr<FlowFileRecord> &ff) { + std::shared_ptr<core::controller::ControllerServiceProvider> controller_service_provider = nullptr; + auto processContext = std::make_shared<core::ProcessContext>(proc_node_, controller_service_provider, no_op_repo_, no_op_repo_, content_repo_); + auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext); + + rpg_->onSchedule(processContext, sessionFactory); + + auto session = std::make_shared<core::ReflexiveSession>(processContext); + + session->add(ff); + + rpg_->onTrigger(processContext, session); + } + + protected: + + bool rpgInitialized_; + + std::shared_ptr<minifi::core::Repository> no_op_repo_; + + std::shared_ptr<minifi::core::ContentRepository> content_repo_; + + std::shared_ptr<core::ProcessorNode> proc_node_; + std::shared_ptr<minifi::RemoteProcessorGroupPort> rpg_; + std::shared_ptr<io::StreamFactory> stream_factory_; + std::string url_; + std::shared_ptr<Configure> configure_; +}; + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif /* LIBMINIFI_INCLUDE_CAPI_INSTANCE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/Plan.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h new file mode 100644 index 0000000..cd9d756 --- /dev/null +++ b/libminifi/include/capi/Plan.h @@ -0,0 +1,117 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBMINIFI_CAPI_PLAN_H_ +#define LIBMINIFI_CAPI_PLAN_H_ +#include <dirent.h> +#include <cstdio> +#include <cstdlib> +#include <sstream> +#include "ResourceClaim.h" +#include <vector> +#include <set> +#include <map> +#include "core/logging/Logger.h" +#include "core/Core.h" +#include "properties/Configure.h" +#include "properties/Properties.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Id.h" +#include "spdlog/sinks/ostream_sink.h" +#include "spdlog/sinks/dist_sink.h" +#include "core/Core.h" +#include "core/FlowFile.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" +#include "core/reporting/SiteToSiteProvenanceReportingTask.h" + + +class ExecutionPlan { + public: + + explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo); + + std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, + core::Relationship relationship = core::Relationship("success", "description"), + bool linkToPrevious = false); + + std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"), + bool linkToPrevious = false); + + bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value); + + void reset(); + + bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr); + + std::set<provenance::ProvenanceEventRecord*> getProvenanceRecords(); + + std::shared_ptr<core::FlowFile> getCurrentFlowFile(); + + std::shared_ptr<core::Repository> getFlowRepo() { + return flow_repo_; + } + + std::shared_ptr<core::Repository> getProvenanceRepo() { + return prov_repo_; + } + + std::shared_ptr<core::ContentRepository> getContentRepo() { + return content_repo_; + } + + protected: + + void finalize(); + + std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false); + + std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory; + + std::shared_ptr<core::ContentRepository> content_repo_; + + std::shared_ptr<core::Repository> flow_repo_; + std::shared_ptr<core::Repository> prov_repo_; + + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_; + + std::atomic<bool> finalized; + + int location; + + std::shared_ptr<core::ProcessSession> current_session_; + std::shared_ptr<core::FlowFile> current_flowfile_; + + std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_; + std::vector<std::shared_ptr<core::Processor>> processor_queue_; + std::vector<std::shared_ptr<core::Processor>> configured_processors_; + std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_; + std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_; + std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_; + std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_; + std::vector<std::shared_ptr<minifi::Connection>> relationships_; + core::Relationship termination_; + + private: + + std::shared_ptr<logging::Logger> logger_; +}; + +#endif /* LIBMINIFI_CAPI_PLAN_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/ReflexiveSession.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/ReflexiveSession.h b/libminifi/include/capi/ReflexiveSession.h new file mode 100644 index 0000000..ebf6cbe --- /dev/null +++ b/libminifi/include/capi/ReflexiveSession.h @@ -0,0 +1,77 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __REFLEXIVE_SESSION_H__ +#define __REFLEXIVE_SESSION_H__ + +#include <uuid/uuid.h> +#include <vector> +#include <queue> +#include <map> +#include <mutex> +#include <atomic> +#include <algorithm> +#include <set> + +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +// ReflexiveSession Class +class ReflexiveSession : public ProcessSession{ + public: + // Constructor + /*! + * Create a new process session + */ + ReflexiveSession(std::shared_ptr<ProcessContext> processContext = nullptr) + : ProcessSession(processContext){ + } + +// Destructor + virtual ~ReflexiveSession() { + } + + virtual std::shared_ptr<core::FlowFile> get(){ + auto prevff = ff; + ff = nullptr; + return prevff; + } + + virtual void add(const std::shared_ptr<core::FlowFile> &flow){ + ff = flow; + } + virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship){ + // no op + } + protected: + // + // Get the FlowFile from the highest priority queue + std::shared_ptr<core::FlowFile> ff; + +}; + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/api.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h new file mode 100644 index 0000000..440329e --- /dev/null +++ b/libminifi/include/capi/api.h @@ -0,0 +1,154 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ +#define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ + +#include <stdint.h> + +#ifdef __cplusplus +extern "C" { +#endif + +#define API_VERSION "0.01" +/**** + * ################################################################## + * BASE NIFI OPERATIONS + * ################################################################## + */ + + +/** + * NiFi Port struct + */ +typedef struct { + char *pord_id; +}nifi_port; + + +/** + * Nifi instance struct + */ +typedef struct { + + void *instance_ptr; + + nifi_port port; + +} nifi_instance; + + +nifi_instance *create_instance(char *url, nifi_port *port); + +void set_property(nifi_instance *, char *, char *); + +void initialize_instance(nifi_instance *); + +void free_instance(nifi_instance*); + + +/**** + * ################################################################## + * Processor OPERATIONS + * ################################################################## + */ + +typedef struct { + void *processor_ptr; +} processor; + +uint8_t run_processor(const processor *processor); + + +/**** + * ################################################################## + * FLOWFILE OPERATIONS + * ################################################################## + */ + +typedef struct{ + char *key; + void *value; + size_t value_size; +} attribute; + +/** + * State of a flow file + * + */ +typedef struct { + uint64_t size; /**< Size in bytes of the data corresponding to this flow file */ + + char * contentLocation; /**< Filesystem location of this object */ + + void *attributes; /**< Hash map of attributes */ + + +} flow_file_record; + + + +typedef struct { + void *plan; +} flow; + + +flow *create_flow(nifi_instance *, const char *); + +void free_flow(flow *); + +flow_file_record *get_next_flow_file(nifi_instance *, flow *); + +size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t ); + + +/** + * Creates a flow file object. + * Will obtain the size of file + */ +flow_file_record* create_flowfile(const char *file); + +void free_flowfile(flow_file_record*); + +uint8_t add_attribute(flow_file_record*, char *key, void *value, size_t size); + +void *get_attribute(flow_file_record*, char *key); + +uint8_t remove_attribute(flow_file_record*, char *key); + +/**** + * ################################################################## + * Remote NIFI OPERATIONS + * ################################################################## + */ + +void transmit_flowfile(flow_file_record *, nifi_instance *); + + +/**** + * ################################################################## + * Persistence Operations + * ################################################################## + */ + +void transmit_flowfile(flow_file_record *, nifi_instance *); + +#ifdef __cplusplus +} +#endif + +#endif /* LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/capi/expect.h ---------------------------------------------------------------------- diff --git a/libminifi/include/capi/expect.h b/libminifi/include/capi/expect.h new file mode 100644 index 0000000..ead182c --- /dev/null +++ b/libminifi/include/capi/expect.h @@ -0,0 +1,32 @@ +/** + + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CAPI_EXPECT_H_ +#define LIBMINIFI_INCLUDE_CAPI_EXPECT_H_ + + +// various likely/unlikely pragmas I've carried over the years. +// you'll see this in many projects +#if defined(__GNUC__) && __GNUC__ >= 4 +#define LIKELY(x) (__builtin_expect((x), 1)) +#define UNLIKELY(x) (__builtin_expect((x), 0)) +#else +#define LIKELY(x) (x) +#define UNLIKELY(x) (x) +#endif + +#endif /* LIBMINIFI_INCLUDE_CAPI_EXPECT_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/core/ProcessContext.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h index 9759c16..ad946a5 100644 --- a/libminifi/include/core/ProcessContext.h +++ b/libminifi/include/core/ProcessContext.h @@ -125,7 +125,9 @@ class ProcessContext : public controller::ControllerServiceLookup { * identifier */ std::shared_ptr<core::controller::ControllerService> getControllerService(const std::string &identifier) { - return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUIDStr()); + if (controller_service_provider_ != nullptr) + return controller_service_provider_->getControllerServiceForComponent(identifier, processor_node_->getUUIDStr()); + return nullptr; } /** http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/core/ProcessSession.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h index 30d0563..38a713d 100644 --- a/libminifi/include/core/ProcessSession.h +++ b/libminifi/include/core/ProcessSession.h @@ -70,7 +70,7 @@ class ProcessSession { } // // Get the FlowFile from the highest priority queue - std::shared_ptr<core::FlowFile> get(); + virtual std::shared_ptr<core::FlowFile> get(); // Create a new UUID FlowFile with no content resource claim and without parent std::shared_ptr<core::FlowFile> create(); // Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent @@ -78,7 +78,7 @@ class ProcessSession { // Create a new UUID FlowFile with no content resource claim and inherit all attributes from parent std::shared_ptr<core::FlowFile> create(const std::shared_ptr<core::FlowFile> &parent); // Add a FlowFile to the session - void add(const std::shared_ptr<core::FlowFile> &flow); + virtual void add(const std::shared_ptr<core::FlowFile> &flow); // Clone a new UUID FlowFile from parent both for content resource claim and attributes std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile> &parent); // Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim @@ -86,7 +86,7 @@ class ProcessSession { // Duplicate a FlowFile with the same UUID and all attributes and content resource claim for the roll back of the session std::shared_ptr<core::FlowFile> duplicate(const std::shared_ptr<core::FlowFile> &original); // Transfer the FlowFile to the relationship - void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship); + virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship); // Put Attribute void putAttribute(const std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value); // Remove Attribute http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/include/sitetosite/SiteToSiteFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h index 4beefc6..648347d 100644 --- a/libminifi/include/sitetosite/SiteToSiteFactory.h +++ b/libminifi/include/sitetosite/SiteToSiteFactory.h @@ -71,6 +71,9 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf if (nullptr != http_protocol) { auto ptr = std::unique_ptr<SiteToSiteClient>(static_cast<SiteToSiteClient*>(http_protocol)); auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort())); + char idStr[37]; + uuid_unparse_lower(uuid, idStr); + std::cout << "sending " << idStr << std::endl; ptr->setPortId(uuid); ptr->setPeer(std::move(peer)); return ptr; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 76932de..38ba889 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -156,6 +156,7 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon if (context->getProperty(portUUID.getName(), value)) { uuid_parse(value.c_str(), protocol_uuid_); } + std::string context_name; if (!context->getProperty(SSLContext.getName(), context_name) || IsNullOrEmpty(context_name)) { context_name = RPG_SSL_CONTEXT_SERVICE_NAME; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/src/capi/Plan.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp new file mode 100644 index 0000000..f038e3d --- /dev/null +++ b/libminifi/src/capi/Plan.cpp @@ -0,0 +1,216 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "capi/Plan.h" +#include <memory> +#include <vector> +#include <set> +#include <string> + +ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo) + : + content_repo_(content_repo), + flow_repo_(flow_repo), + prov_repo_(prov_repo), + finalized(false), + location(-1), + current_flowfile_(nullptr), + logger_(logging::LoggerFactory<ExecutionPlan>::getLogger()) { + stream_factory = std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()); +} + +std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, core::Relationship relationship, +bool linkToPrevious) { + if (finalized) { + return nullptr; + } + + + uuid_t uuid; + uuid_generate(uuid); + + processor->setStreamFactory(stream_factory); + // initialize the processor + processor->initialize(); + + processor_mapping_[processor->getUUIDStr()] = processor; + + if (!linkToPrevious) { + termination_ = relationship; + } else { + std::shared_ptr<core::Processor> last = processor_queue_.back(); + + if (last == nullptr) { + last = processor; + termination_ = relationship; + } + + std::stringstream connection_name; + connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr(); + logger_->log_info("Creating %s connection for proc %d", connection_name.str(), processor_queue_.size() + 1); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str()); + connection->setRelationship(relationship); + + // link the connections so that we can test results at the end for this + connection->setSource(last); + connection->setDestination(processor); + + uuid_t uuid_copy, uuid_copy_next; + last->getUUID(uuid_copy); + connection->setSourceUUID(uuid_copy); + processor->getUUID(uuid_copy_next); + connection->setDestinationUUID(uuid_copy_next); + last->addConnection(connection); + if (last != processor) { + processor->addConnection(connection); + } + relationships_.push_back(connection); + } + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor); + + processor_nodes_.push_back(node); + + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider_, prov_repo_, flow_repo_, content_repo_); + processor_contexts_.push_back(context); + + processor_queue_.push_back(processor); + + return processor; +} + +std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship, +bool linkToPrevious) { + if (finalized) { + return nullptr; + } + + uuid_t uuid; + uuid_generate(uuid); + + auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid); + if (nullptr == ptr) { + throw std::exception(); + } + std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr); + + processor->setName(name); + + return addProcessor(processor, name, relationship, linkToPrevious); +} + +bool ExecutionPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value) { + uint32_t i = 0; + logger_->log_info("Attempting to set property %s %s for %s", prop, value, proc->getName()); + for (i = 0; i < processor_queue_.size(); i++) { + if (processor_queue_.at(i) == proc) { + break; + } + } + + if (i >= processor_queue_.size() || i == 0 || i >= processor_contexts_.size()) { + return false; + } + + return processor_contexts_.at(i)->setProperty(prop, value); +} + +void ExecutionPlan::reset() { + process_sessions_.clear(); + factories_.clear(); + location = -1; + for (auto proc : processor_queue_) { + while (proc->getActiveTasks() > 0) { + proc->decrementActiveTask(); + } + } +} + + +bool ExecutionPlan::runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify) { + if (!finalized) { + finalize(); + } + logger_->log_info("Running next processor %d, processor_queue_.size %d, processor_contexts_.size %d", location, processor_queue_.size(), processor_contexts_.size()); + + location++; + std::shared_ptr<core::Processor> processor = processor_queue_.at(location); + std::shared_ptr<core::ProcessContext> context = processor_contexts_.at(location); + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); + factories_.push_back(factory); + if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) { + processor->onSchedule(context, factory); + configured_processors_.push_back(processor); + } + std::shared_ptr<core::ProcessSession> current_session = std::make_shared<core::ProcessSession>(context); + process_sessions_.push_back(current_session); + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + if (verify != nullptr) { + verify(context, current_session); + } else { + logger_->log_info("Running %s", processor->getName()); + processor->onTrigger(context, current_session); + } + current_session->commit(); + current_flowfile_ = current_session->get(); + return location + 1 < processor_queue_.size(); +} + +std::set<provenance::ProvenanceEventRecord*> ExecutionPlan::getProvenanceRecords() { + return process_sessions_.at(location)->getProvenanceReporter()->getEvents(); +} + +std::shared_ptr<core::FlowFile> ExecutionPlan::getCurrentFlowFile() { + return current_flowfile_; +} + +std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest) { + std::stringstream connection_name; + std::shared_ptr<core::Processor> last = processor; + connection_name << last->getUUIDStr() << "-to-" << processor->getUUIDStr(); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(flow_repo_, content_repo_, connection_name.str()); + connection->setRelationship(termination_); + + // link the connections so that we can test results at the end for this + connection->setSource(last); + if (setDest) + connection->setDestination(processor); + + uuid_t uuid_copy; + last->getUUID(uuid_copy); + connection->setSourceUUID(uuid_copy); + if (setDest) + connection->setDestinationUUID(uuid_copy); + + processor->addConnection(connection); + return connection; +} + +void ExecutionPlan::finalize() { + if (relationships_.size() > 0) { + relationships_.push_back(buildFinalConnection(processor_queue_.back())); + } else { + for (auto processor : processor_queue_) { + relationships_.push_back(buildFinalConnection(processor, true)); + } + } + + finalized = true; +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/088e1be7/libminifi/src/capi/api.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp new file mode 100644 index 0000000..e46fcad --- /dev/null +++ b/libminifi/src/capi/api.cpp @@ -0,0 +1,264 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <string> +#include <map> +#include <memory> +#include <utility> +#include "core/Core.h" +#include "capi/api.h" +#include "capi/expect.h" +#include "capi/Instance.h" +#include "capi/Plan.h" +#include "ResourceClaim.h" + +class DirectoryConfiguration { + protected: + DirectoryConfiguration() { + minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY); + } + public: + static void initialize() { + static DirectoryConfiguration configure; + } +}; + +/** + * Creates a NiFi Instance from the url and output port. + * @param url http URL for NiFi instance + * @param port Remote output port. + */ +nifi_instance *create_instance(char *url, nifi_port *port) { + // make sure that we have a thread safe way of initializing the content directory + DirectoryConfiguration::initialize(); + + nifi_instance *instance = new nifi_instance; + + instance->instance_ptr = new minifi::Instance(url, port->pord_id); + instance->port.pord_id = port->pord_id; + + return instance; +} + +/** + * Initializes the instance + */ +void initialize_instance(nifi_instance *instance) { + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + minifi_instance_ref->setRemotePort(instance->port.pord_id); +} + +/** + * Sets a property within the nifi instance + * @param instance nifi instance + * @param key key in which we will set the valiue + * @param value + */ +void set_property(nifi_instance *instance, char *key, char *value) { + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + minifi_instance_ref->getConfiguration()->set(key, value); +} + +/** + * Reclaims memory associated with a nifi instance structure. + * @param instance nifi instance. + */ +void free_instance(nifi_instance* instance) { + if (instance != nullptr) { + delete ((minifi::Instance*) instance->instance_ptr); + delete instance; + } +} + +/** + * Creates a flow file record + * @param file file to place into the flow file. + */ +flow_file_record* create_flowfile(const char *file) { + flow_file_record *new_ff = new flow_file_record; + new_ff->attributes = new std::map<std::string, std::string>(); + new_ff->contentLocation = new char[strlen(file)]; + snprintf(new_ff->contentLocation, strlen(file), "%s", file); + std::ifstream in(file, std::ifstream::ate | std::ifstream::binary); + // set the size of the flow file. + new_ff->size = in.tellg(); + + return new_ff; +} + +/** + * Reclaims memory associated with a flow file object + * @param ff flow file record. + */ +void free_flowfile(flow_file_record *ff) { + if (ff != nullptr) { + auto map = static_cast<std::map<std::string, std::string>*>(ff->attributes); + delete[] ff->contentLocation; + delete map; + delete ff; + } +} + +/** + * Adds an attribute + * @param ff flow file record + * @param key key + * @param value value to add + * @param size size of value + * @return 0 + */ +uint8_t add_attribute(flow_file_record *ff, char *key, void *value, size_t size) { + auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes); + attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size))); + return 0; +} + +/* + * Obtains the attribute. + * @param ff flow file record + * @param key key + * @param caller_attribute caller supplied object in which we will copy the data ptr + * @return 0 if successful, -1 if the key does not exist + */ +uint8_t get_attribute(flow_file_record *ff, char *key, attribute *caller_attribute) { + auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes); + auto find = attribute_map->find(key); + if (find != attribute_map->end()) { + caller_attribute->key = key; + caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data())); + caller_attribute->value_size = find->second.size(); + return 0; + } + return -1; +} + +/** + * Removes a key from the attribute chain + * @param ff flow file record + * @param key key to remove + * @return 0 if removed, -1 otherwise + */ +uint8_t remove_attribute(flow_file_record *ff, char *key) { + auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes); + auto find = attribute_map->find(key); + if (find != attribute_map->end()) { + attribute_map->erase(find); + return 0; + } + return -1; +} + +/** + * Transmits the flowfile + * @param ff flow file record + * @param instance nifi instance structure + */ +void transmit_flowfile(flow_file_record *ff, nifi_instance *instance) { + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + // in the unlikely event the user forgot to initialize the instance, we shall do it for them. + if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) { + minifi_instance_ref->setRemotePort(instance->port.pord_id); + } + + auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes); + + auto no_op = minifi_instance_ref->getNoOpRepository(); + + auto content_repo = minifi_instance_ref->getContentRepository(); + + std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo); + claim->increaseFlowFileRecordOwnedCount(); + claim->increaseFlowFileRecordOwnedCount(); + + auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim); + ffr->addAttribute("nanofi.version", API_VERSION); + ffr->setSize(ff->size); + + std::string port_uuid = instance->port.pord_id; + + minifi_instance_ref->transfer(ffr); +} + +flow *create_flow(nifi_instance *instance, const char *first_processor) { + auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); + flow *new_flow = new flow; + + auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository()); + + new_flow->plan = execution_plan; + + // automatically adds it with success + execution_plan->addProcessor(first_processor, first_processor); + + return new_flow; +} + +void free_flow(flow *flow) { + if (flow == nullptr) + return; + auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); + delete execution_plan; + delete flow; +} + +flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) { + auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); + + execution_plan->reset(); + while (execution_plan->runNextProcessor()) { + } + auto ff = execution_plan->getCurrentFlowFile(); + if (ff == nullptr) + return nullptr; + auto claim = ff->getResourceClaim(); + + if (claim != nullptr) { + // create a flow file. + claim->increaseFlowFileRecordOwnedCount(); + auto path = claim->getContentFullPath(); + auto ffr = create_flowfile(path.c_str()); + std::cout << "dang created " << path << " " << ff->getSize() << std::endl; + return ffr; + } else { + return nullptr; + } +} + +size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) { + auto execution_plan = static_cast<ExecutionPlan*>(flow->plan); + int i = 0; + for (; i < size; i++) { + execution_plan->reset(); + while (execution_plan->runNextProcessor()) { + } + auto ff = execution_plan->getCurrentFlowFile(); + if (ff == nullptr) + break; + auto claim = ff->getResourceClaim(); + + if (claim != nullptr) { + claim->increaseFlowFileRecordOwnedCount(); + + auto path = claim->getContentFullPath(); + // create a flow file. + ff_r[i] = create_flowfile(path.c_str()); + } else { + break; + } + } + return i; +}
