This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 6fe6ae8d4743643b474214ace79e86d8b60c724b Author: Murtuza Shareef <[email protected]> AuthorDate: Mon Jun 10 18:15:48 2019 -0400 MINIFICPP-927 Add delimited tailfile processor Signed-off-by: Arpad Boda <[email protected]> This closes #613 --- libminifi/include/core/ContentRepository.h | 2 +- libminifi/include/core/ProcessContext.h | 18 +- libminifi/include/core/StreamManager.h | 2 +- libminifi/include/utils/file/FileUtils.h | 49 +- .../src/core/repository/FileSystemRepository.cpp | 2 +- nanofi/CMakeLists.txt | 6 +- nanofi/ecu/CMakeLists.txt | 39 +- nanofi/ecu/log_aggregator.c | 82 ++++ nanofi/ecu/tail_file.c | 206 -------- nanofi/ecu/tailfile_chunk.c | 77 +++ nanofi/ecu/tailfile_delimited.c | 77 +++ nanofi/examples/CMakeLists.txt | 4 - nanofi/examples/tail_file.c | 257 ---------- nanofi/include/api/ecu.h | 95 ++++ nanofi/include/api/nanofi.h | 38 +- nanofi/include/core/cstructs.h | 21 +- nanofi/include/core/file_utils.h | 50 +- nanofi/include/core/flowfiles.h | 27 +- nanofi/include/cxx/Instance.h | 19 +- nanofi/src/api/ecu.c | 530 +++++++++++++++++++++ nanofi/src/api/nanofi.cpp | 96 +++- nanofi/src/core/file_utils.c | 136 ++++-- nanofi/src/core/flowfiles.c | 149 +++++- nanofi/tests/CAPITests.cpp | 12 +- ...{CTailFileTests.cpp => CLogAggregatorTests.cpp} | 196 ++++---- nanofi/tests/CTailFileChunkTests.cpp | 135 ++++++ nanofi/tests/CTailFileDelimitedTests.cpp | 256 ++++++++++ nanofi/tests/CTestsBase.h | 141 ++++++ 28 files changed, 2016 insertions(+), 706 deletions(-) diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h index 348725e..c3bec9f 100644 --- a/libminifi/include/core/ContentRepository.h +++ b/libminifi/include/core/ContentRepository.h @@ -46,7 +46,7 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim> { */ virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0; - virtual std::string getStoragePath() { + virtual std::string getStoragePath() const { return directory_; } diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h index 424e306..be8ed91 100644 --- a/libminifi/include/core/ProcessContext.h +++ b/libminifi/include/core/ProcessContext.h @@ -59,7 +59,9 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: flow_repo_(flow_repo), content_repo_(content_repo), processor_node_(processor), - logger_(logging::LoggerFactory<ProcessContext>::getLogger()) { + logger_(logging::LoggerFactory<ProcessContext>::getLogger()), + configure_(std::make_shared<minifi::Configure>()), + initialized_(false) { repo_ = repo; } @@ -75,7 +77,8 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: flow_repo_(flow_repo), content_repo_(content_repo), processor_node_(processor), - logger_(logging::LoggerFactory<ProcessContext>::getLogger()) { + logger_(logging::LoggerFactory<ProcessContext>::getLogger()), + initialized_(false) { repo_ = repo; } // Destructor @@ -197,6 +200,15 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: return controller_service_provider_->getControllerServiceName(identifier); } + void initializeContentRepository(const std::string& home) { + configure_->setHome(home); + content_repo_->initialize(configure_); + initialized_ = true; + } + + bool isInitialized() const { + return initialized_; + } private: template<typename T> @@ -217,7 +229,9 @@ class ProcessContext : public controller::ControllerServiceLookup, public core:: // Logger std::shared_ptr<logging::Logger> logger_; + std::shared_ptr<Configure> configure_; + bool initialized_; }; } /* namespace core */ diff --git a/libminifi/include/core/StreamManager.h b/libminifi/include/core/StreamManager.h index 65e0414..a2a2e78 100644 --- a/libminifi/include/core/StreamManager.h +++ b/libminifi/include/core/StreamManager.h @@ -41,7 +41,7 @@ class StreamManager { } - virtual std::string getStoragePath() = 0; + virtual std::string getStoragePath() const = 0; /** * Create a write stream using the streamId as a reference. diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h index 91fc130..bedfee8 100644 --- a/libminifi/include/utils/file/FileUtils.h +++ b/libminifi/include/utils/file/FileUtils.h @@ -282,7 +282,15 @@ class FileUtils { } #endif - static int create_dir(const std::string &path, bool create = true) { + static int is_directory(const char * path) { + struct stat dir_stat; + if (stat(path, &dir_stat) < 0) { + return 0; + } + return S_ISDIR(dir_stat.st_mode); + } + + static int create_dir(const std::string& path, bool recursive = true) { #ifdef BOOST_VERSION boost::filesystem::path dir(path); if(boost::filesystem::create_directory(dir)) @@ -301,12 +309,41 @@ class FileUtils { return 0; } #else - struct stat dir_stat; - if (stat(path.c_str(), &dir_stat)) { - if (mkdir(path.c_str(), 0700) != 0 && errno != EEXIST) { + if (!recursive) { + if (mkdir(path.c_str(), 0700) != 0 && errno != EEXIST) { + return -1; + } + return 0; + } + + int ret = mkdir(path.c_str(), 0700); + if (ret == 0) { + return 0; + } + + switch (errno) { + case ENOENT: { + size_t found = path.find_last_of(get_separator(0)); + + if (found == std::string::npos) { + return -1; + } + + const std::string dir = path.substr(0, found); + int res = create_dir(dir); + if (res < 0) { + return -1; + } + return mkdir(path.c_str(), 0700); + } + case EEXIST: { + if (is_directory(path.c_str())) { + return 0; + } + return -1; + } + default: return -1; - } - return 0; } #endif return -1; diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp index 4607d74..4f9b83d 100644 --- a/libminifi/src/core/repository/FileSystemRepository.cpp +++ b/libminifi/src/core/repository/FileSystemRepository.cpp @@ -34,7 +34,7 @@ bool FileSystemRepository::initialize(const std::shared_ptr<minifi::Configure> & if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value)) { directory_ = value; } else { - directory_ = configuration->getHome() + "/contentrepository"; + directory_ = configuration->getHome(); } utils::file::FileUtils::create_dir(directory_); return true; diff --git a/nanofi/CMakeLists.txt b/nanofi/CMakeLists.txt index 9e4e1e8..7110519 100644 --- a/nanofi/CMakeLists.txt +++ b/nanofi/CMakeLists.txt @@ -33,9 +33,11 @@ else() include_directories(../libminifi/opsys/posix) endif() -file(GLOB NANOFI_SOURCES "src/api/*.cpp" "src/core/*.c*" "src/cxx/*.cpp" "src/sitetosite/*.c*") +file(GLOB NANOFI_SOURCES "src/api/*.c*" "src/core/*.c*" "src/cxx/*.cpp" "src/sitetosite/*.c*") -file(GLOB NANOFI_EXAMPLES_SOURCES "examples/*.c" ) +if(WIN32) +list(REMOVE_ITEM NANOFI_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/src/api/ecu.c ${CMAKE_CURRENT_SOURCE_DIR}/src/core/file_utils.c ${CMAKE_CURRENT_SOURCE_DIR}/src/core/flowfiles.c) +endif() file(GLOB NANOFI_ECU_SOURCES "ecu/*.c") diff --git a/nanofi/ecu/CMakeLists.txt b/nanofi/ecu/CMakeLists.txt index b28af76..fccb443 100644 --- a/nanofi/ecu/CMakeLists.txt +++ b/nanofi/ecu/CMakeLists.txt @@ -19,33 +19,6 @@ cmake_minimum_required(VERSION 2.6) -IF(POLICY CMP0048) - CMAKE_POLICY(SET CMP0048 OLD) -ENDIF(POLICY CMP0048) - -include_directories(/include) - -include(CheckCXXCompilerFlag) -if (WIN32) - if ((MSVC_VERSION GREATER "1900") OR (MSVC_VERSION EQUAL "1900")) - CHECK_CXX_COMPILER_FLAG("/std:c++14" _cpp_latest_flag_supported) - if (_cpp_latest_flag_supported) - add_compile_options("/std:c++14") - endif() - endif() -else() -CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11) -CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X) -if(COMPILER_SUPPORTS_CXX11) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") -elseif(COMPILER_SUPPORTS_CXX0X) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x") -else() - message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.") -endif() - -endif() - if (APPLE) set(LINK_FLAGS "-Wl,-all_load") set(LINK_END_FLAGS "") @@ -56,8 +29,16 @@ endif () if (NOT WIN32) -add_executable(tail_file tail_file.c) +add_executable(log_aggregator log_aggregator.c) + +target_link_libraries(log_aggregator nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS}) + +add_executable(tailfile_chunk tailfile_chunk.c) + +target_link_libraries(tailfile_chunk nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS}) + +add_executable(tailfile_delimited tailfile_delimited.c) -target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS}) +target_link_libraries(tailfile_delimited nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS}) endif() \ No newline at end of file diff --git a/nanofi/ecu/log_aggregator.c b/nanofi/ecu/log_aggregator.c new file mode 100644 index 0000000..52d4f23 --- /dev/null +++ b/nanofi/ecu/log_aggregator.c @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +#include "api/nanofi.h" +#include "api/ecu.h" +#include "core/string_utils.h" +#include "core/cstructs.h" +#include "core/file_utils.h" +#include "core/flowfiles.h" + +#include <unistd.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <errno.h> +#include <limits.h> +#include <signal.h> +#include <sys/stat.h> + +int main(int argc, char** argv) { + + if (argc < 7) { + printf("Error: must run ./log_aggregator <file> <interval> <delimiter> <hostname> <tcp port number> <nifi port uuid>\n"); + exit(1); + } + + tailfile_input_params input_params = init_logaggregate_input(argv); + + uint64_t intrvl = 0; + uint64_t port_num = 0; + if (validate_input_params(&input_params, &intrvl, &port_num) < 0) { + return 1; + } + + setup_signal_action(); + nifi_proc_params params = setup_nifi_processor(&input_params, "LogAggregator", on_trigger_logaggregator); + + set_standalone_property(params.processor, "file_path", input_params.file); + set_standalone_property(params.processor, "delimiter", input_params.delimiter); + + struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid); + + char uuid_str[37]; + get_proc_uuid_from_processor(params.processor, uuid_str); + + while (!stopped) { + flow_file_record * new_ff = invoke(params.processor); + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid_str, pp); + if (pp) { + transmit_payload(client, pp->ff_list, 0); + delete_all_flow_files_from_proc(uuid_str); + } + free_flowfile(new_ff); + sleep(intrvl); + } + + printf("log aggregator processor stopped\n"); + if (client) { + destroyClient(client); + } + clear_content_repo(params.instance); + delete_all_flow_files_from_proc(uuid_str); + free_standalone_processor(params.processor); + free_instance(params.instance); + free_proc_params(uuid_str); + return 0; +} diff --git a/nanofi/ecu/tail_file.c b/nanofi/ecu/tail_file.c deleted file mode 100644 index c428be1..0000000 --- a/nanofi/ecu/tail_file.c +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -#include "api/nanofi.h" -#include "core/string_utils.h" -#include "core/cstructs.h" -#include "core/file_utils.h" -#include <unistd.h> -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <errno.h> -#include <limits.h> -#include <signal.h> -#include <sys/stat.h> - -struct flow_file_records * flowfiles = NULL; -nifi_instance * instance = NULL; -standalone_processor * proc = NULL; -int file_offset = 0; -int stopped = 0; -flow_file_list ff_list; -token_list tks; - -void signal_handler(int signum) { - if (signum == SIGINT || signum == SIGTERM) { - stopped = 1; - } -} - -void transmit_flow_files(nifi_instance * instance) { - flow_file_list_node * head = ff_list.head; - while (head) { - transmit_flowfile(head->ff_record, instance); - head = head->next; - } -} - -void set_offset(int offset) { - file_offset = offset; -} - -int get_offset() { - return file_offset; -} - -void on_trigger_callback(processor_session * ps, processor_context * ctx) { - - char file_path[4096]; - char delimiter[3]; - - if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) { - return; - } - - if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) { - return; - } - - if (strlen(delimiter) == 0) { - printf("Delimiter not specified or it is empty\n"); - return; - } - char delim = delimiter[0]; - - if (delim == '\\') { - if (strlen(delimiter) > 1) { - switch (delimiter[1]) { - case 'r': - delim = '\r'; - break; - case 't': - delim = '\t'; - break; - case 'n': - delim = '\n'; - break; - case '\\': - delim = '\\'; - break; - default: - break; - } - } - } - - tks = tail_file(file_path, delim, get_offset()); - - if (!validate_list(&tks)) return; - - set_offset(get_offset() + tks.total_bytes); - - token_node * head; - for (head = tks.head; head && head->data; head = head->next) { - flow_file_record * ffr = generate_flow_file(instance, proc); - const char * flow_file_path = ffr->contentLocation; - FILE * ffp = fopen(flow_file_path, "wb"); - if (!ffp) { - printf("Cannot open flow file at path %s to write content to.\n", flow_file_path); - break; - } - - int count = strlen(head->data); - int ret = fwrite(head->data, 1, count, ffp); - if (ret < count) { - fclose(ffp); - break; - } - fseek(ffp, 0, SEEK_END); - ffr->size = ftell(ffp); - fclose(ffp); - add_flow_file_record(&ff_list, ffr); - } - free_all_tokens(&tks); -} - -int main(int argc, char** argv) { - - if (argc < 6) { - printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n"); - exit(1); - } - - char * file = argv[1]; - char * interval = argv[2]; - char * delimiter = argv[3]; - char * instance_str = argv[4]; - char * port_str = argv[5]; - - if (access(file, F_OK) == -1) { - printf("Error: %s doesn't exist!\n", file); - exit(1); - } - - struct stat stats; - errno = 0; - int ret = stat(file, &stats); - - if (ret == -1) { - printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno)); - exit(1); - } - // Check for file existence - if (S_ISDIR(stats.st_mode)){ - printf("Error: %s is a directory!\n", file); - exit(1); - } - - errno = 0; - unsigned long intrvl = strtol(interval, NULL, 10); - - if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) { - printf("Invalid interval value specified\n"); - return 0; - } - - struct sigaction action; - memset(&action, 0, sizeof(sigaction)); - action.sa_handler = signal_handler; - sigaction(SIGTERM, &action, NULL); - sigaction(SIGINT, &action, NULL); - - nifi_port port; - - port.port_id = port_str; - - instance = create_instance(instance_str, &port); - - const char * processor_name = "TailFile"; - - add_custom_processor(processor_name, on_trigger_callback); - - proc = create_processor(processor_name); - - set_standalone_property(proc, "file_path", file); - set_standalone_property(proc, "delimiter", delimiter); - - set_offset(0); - while (!stopped) { - flow_file_record * new_ff = invoke(proc); - transmit_flow_files(instance); - free_flow_file_list(&ff_list); - free_flowfile(new_ff); - sleep(intrvl); - } - - printf("tail file processor stopped\n"); - free_standalone_processor(proc); - free(instance); - - return 0; -} diff --git a/nanofi/ecu/tailfile_chunk.c b/nanofi/ecu/tailfile_chunk.c new file mode 100644 index 0000000..26b9966 --- /dev/null +++ b/nanofi/ecu/tailfile_chunk.c @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +#include "api/ecu.h" +#include "core/flowfiles.h" +#include <unistd.h> +#include <errno.h> +#include <limits.h> +#include <signal.h> +#include <sys/stat.h> +#include <stdlib.h> +#include <string.h> +#include <stdio.h> + +int main(int argc, char** argv) { + + if (argc < 7) { + printf("Error: must run ./tailfile_chunk <file> <interval> <chunksize> <hostname> <tcp port number> <nifi port uuid>\n"); + exit(1); + } + + tailfile_input_params input_params = init_tailfile_chunk_input(argv); + + uint64_t intrvl = 0; + uint64_t port_num = 0; + if (validate_input_params(&input_params, &intrvl, &port_num) < 0) { + return 1; + } + + setup_signal_action(); + nifi_proc_params params = setup_nifi_processor(&input_params, "TailFileChunk", on_trigger_tailfilechunk); + + set_standalone_property(params.processor, "file_path", input_params.file); + set_standalone_property(params.processor, "chunk_size", input_params.chunk_size); + + struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid); + + char uuid_str[37]; + get_proc_uuid_from_processor(params.processor, uuid_str); + + while (!stopped) { + flow_file_record * new_ff = invoke(params.processor); + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid_str, pp); + if (pp) { + transmit_payload(client, pp->ff_list, 0); + delete_all_flow_files_from_proc(uuid_str); + } + free_flowfile(new_ff); + sleep(intrvl); + } + + printf("processor stopped\n"); + if (client) { + destroyClient(client); + } + clear_content_repo(params.instance); + delete_all_flow_files_from_proc(uuid_str); + free_standalone_processor(params.processor); + free_instance(params.instance); + free_proc_params(uuid_str); + return 0; +} diff --git a/nanofi/ecu/tailfile_delimited.c b/nanofi/ecu/tailfile_delimited.c new file mode 100644 index 0000000..3495231 --- /dev/null +++ b/nanofi/ecu/tailfile_delimited.c @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +#include "api/ecu.h" +#include "core/flowfiles.h" +#include <unistd.h> +#include <errno.h> +#include <limits.h> +#include <signal.h> +#include <sys/stat.h> +#include <stdlib.h> +#include <string.h> +#include <stdio.h> + +int main(int argc, char** argv) { + + if (argc < 7) { + printf("Error: must run ./tailfile_delimited <file> <interval> <delimiter> <hostname> <tcp port number> <nifi port uuid>\n"); + exit(1); + } + + tailfile_input_params input_params = init_logaggregate_input(argv); + + uint64_t intrvl = 0; + uint64_t port_num = 0; + if (validate_input_params(&input_params, &intrvl, &port_num) < 0) { + return 1; + } + + setup_signal_action(); + nifi_proc_params params = setup_nifi_processor(&input_params, "TailFileDelimited", on_trigger_tailfiledelimited); + + set_standalone_property(params.processor, "file_path", input_params.file); + set_standalone_property(params.processor, "delimiter", input_params.delimiter); + + struct CRawSiteToSiteClient * client = createClient(input_params.instance, port_num, input_params.nifi_port_uuid); + + char uuid_str[37]; + get_proc_uuid_from_processor(params.processor, uuid_str); + + while (!stopped) { + flow_file_record * new_ff = invoke(params.processor); + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid_str, pp); + if (pp) { + transmit_payload(client, pp->ff_list, 1); + delete_completed_flow_files_from_proc(uuid_str); + } + free_flowfile(new_ff); + sleep(intrvl); + } + + printf("tailfile delimited processor stopped\n"); + if (client) { + destroyClient(client); + } + clear_content_repo(params.instance); + delete_all_flow_files_from_proc(uuid_str); + free_standalone_processor(params.processor); + free_instance(params.instance); + free_proc_params(uuid_str); + return 0; +} diff --git a/nanofi/examples/CMakeLists.txt b/nanofi/examples/CMakeLists.txt index b9480ed..6a9779c 100644 --- a/nanofi/examples/CMakeLists.txt +++ b/nanofi/examples/CMakeLists.txt @@ -83,8 +83,4 @@ add_executable(monitor_directory monitor_directory.c) target_link_libraries(monitor_directory nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS}) -#add_executable(tail_file tail_file.c) - -#target_link_libraries(tail_file nanofi ${CMAKE_THREAD_LIBS_INIT} ${LINK_FLAGS} minifi-http-curl ${LINK_END_FLAGS}) - endif() diff --git a/nanofi/examples/tail_file.c b/nanofi/examples/tail_file.c deleted file mode 100644 index 2200df4..0000000 --- a/nanofi/examples/tail_file.c +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -#include "api/nanofi.h" -#include "core/string_utils.h" -#include <unistd.h> -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <errno.h> -#include <limits.h> -#include <signal.h> -#include <sys/stat.h> - -typedef struct flow_file_records { - flow_file_record ** records; - uint64_t len; -} flow_file_records; - -struct flow_file_records * flowfiles = NULL; -nifi_instance * instance = NULL; -standalone_processor * proc = NULL; -int file_offset = 0; -int stopped = 0; - -void signal_handler(int signum) { - if (signum == SIGINT || signum == SIGTERM) { - stopped = 1; - } -} - -void transmit_flow_files(nifi_instance * instance) { - NULL_CHECK( ,flowfiles); - int i; - for (i = 0; i < flowfiles->len; ++i) { - NULL_CHECK( ,flowfiles->records[i]); - transmit_flowfile(flowfiles->records[i], instance); - } -} - -void free_flow_file_records() { - NULL_CHECK( ,flowfiles); - int i; - for (i = 0; i < flowfiles->len; ++i) { - free_flowfile(flowfiles->records[i]); - } - free(flowfiles); - flowfiles = NULL; -} - -void set_offset(int offset) { - file_offset = offset; -} - -int get_offset() { - return file_offset; -} - -void free_all_strings(char ** strings, int num_strings) { - int i; - for (i = 0; i < num_strings; ++i) { - free(strings[i]); - } -} - -void on_trigger_callback(processor_session * ps, processor_context * ctx) { - - char file_path[4096]; - char delimiter[2]; - - if (get_property(ctx, "file_path", file_path, 50) != 0) { - return; - } - - if (get_property(ctx, "delimiter", delimiter, 2) != 0) { - return; - } - - if (strlen(delimiter) == 0) { - printf("Delimiter not specified or it is empty\n"); - return; - } - char delim = '\0'; - if (strlen(delimiter) > 0) { - delim = delimiter[0]; - } - - if (delim == '\0') { - printf("Invalid delimiter \n"); - return; - } - - if (delim == '\\') { - if (strlen(delimiter) > 1) { - switch (delimiter[1]) { - case 'r': - delim = '\r'; - break; - case 't': - delim = '\t'; - break; - case 'n': - delim = '\n'; - break; - case '\\': - delim = '\\'; - break; - default: - break; - } - } - } - - int curr_offset = get_offset(); - int max_bytes_read = 4096; - char buff[max_bytes_read + 1]; - memset(buff,'\0', max_bytes_read); - FILE * fp = fopen(file_path, "rb"); - if (!fp) return; - fseek(fp, curr_offset, SEEK_SET); - - int bytes_read = 0; - while ((bytes_read = fread(buff, 1, max_bytes_read, fp)) > 0) { - buff[bytes_read] = '\0'; - tokenizer_mode_t mode = TAILFILE_MODE; - struct tokens tks = tokenize_string(buff, delim, mode); - - if (tks.num_strings == 0) return; - - set_offset(get_offset() + tks.total_bytes); - - flowfiles = (flow_file_records *)malloc(sizeof(flow_file_records)); - flowfiles->records = malloc(sizeof(flow_file_record *) * tks.num_strings); - flowfiles->len = tks.num_strings; - - int i; - for (i = 0; i < tks.num_strings; ++i) { - flowfiles->records[i] = NULL; - } - - for (i = 0; i < tks.num_strings; ++i) { - if (tks.str_list[i] && strlen(tks.str_list[i]) > 0) { - flow_file_record * ffr = generate_flow_file(instance, proc); - const char * flow_file_path = ffr->contentLocation; - FILE * ffp = fopen(flow_file_path, "wb"); - if (!ffp) { - printf("Cannot open flow file at path %s to write content to.\n", flow_file_path); - fclose(fp); - free_tokens(&tks); - return; - } - int count = strlen(tks.str_list[i]); - int ret = fwrite(tks.str_list[i], 1, count, ffp); - if (ret < count) { - fclose(ffp); - return; - } - fseek(ffp, 0, SEEK_END); - ffr->size = ftell(ffp); - fclose(ffp); - flowfiles->records[i] = ffr; - } - } - free_tokens(&tks); - } - fclose(fp); -} - -int main(int argc, char** argv) { - - if (argc < 6) { - printf("Error: must run ./tail_file <file> <interval> <delimiter> <nifi instance url> <remote port>\n"); - exit(1); - } - - char * file = argv[1]; - char * interval = argv[2]; - char * delimiter = argv[3]; - char * instance_str = argv[4]; - char * port_str = argv[5]; - - if (access(file, F_OK) == -1) { - printf("Error: %s doesn't exist!\n", file); - exit(1); - } - - struct stat stats; - int ret = stat(file, &stats); - - errno = 0; - if (ret == -1) { - printf("Error occurred while getting file status {file: %s, error: %s}\n", file, strerror(errno)); - exit(1); - } - // Check for file existence - if (S_ISDIR(stats.st_mode)){ - printf("Error: %s is a directory!\n", file); - exit(1); - } - - errno = 0; - unsigned long intrvl = strtol(interval, NULL, 10); - - if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) { - printf("Invalid interval value specified\n"); - return 0; - } - - struct sigaction action; - memset(&action, 0, sizeof(sigaction)); - action.sa_handler = signal_handler; - sigaction(SIGTERM, &action, NULL); - sigaction(SIGINT, &action, NULL); - - nifi_port port; - - port.port_id = port_str; - - instance = create_instance(instance_str, &port); - - const char * processor_name = "TailFile"; - - add_custom_processor(processor_name, on_trigger_callback); - - proc = create_processor(processor_name); - - set_standalone_property(proc, "file_path", file); - set_standalone_property(proc, "delimiter", delimiter); - - set_offset(0); - while (!stopped) { - flow_file_record * new_ff = invoke(proc); - transmit_flow_files(instance); - free_flow_file_records(); - free_flowfile(new_ff); - sleep(intrvl); - } - - free_standalone_processor(proc); - free(instance); - - return 0; -} diff --git a/nanofi/include/api/ecu.h b/nanofi/include/api/ecu.h new file mode 100644 index 0000000..9549491 --- /dev/null +++ b/nanofi/include/api/ecu.h @@ -0,0 +1,95 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +#ifndef NANOFI_INCLUDE_API_ECU_H_ +#define NANOFI_INCLUDE_API_ECU_H_ + +#include <signal.h> +#include "api/nanofi.h" +#include "uthash.h" +#include "utlist.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct proc_properties { + char * file_path; + char delimiter; + uint64_t chunk_size; +} proc_properties; + +typedef struct processor_params { + char uuid_str[37]; //key + struct flow_file_list * ff_list; + uint64_t curr_offset; + struct proc_properties * properties; + UT_hash_handle hh; +} processor_params; + +extern processor_params * procparams; +extern volatile sig_atomic_t stopped; + +typedef struct tailfile_input_params { + char * file; + char * interval; + char * delimiter; + char * instance; + char * tcp_port; + char * nifi_port_uuid; + char * chunk_size; +} tailfile_input_params; + +typedef struct nifi_proc_params { + nifi_instance * instance; + standalone_processor * processor; +} nifi_proc_params; + +/** + * Tails a delimited file starting from an offset up to the end of file + * @param file the path to the file to tail + * @param delim the delimiter character + * @param ctx the process context + * For eg. To tail from beginning of the file curr_offset = 0 + * @return a list of flow file info containing list of flow file records + * and the current offset in the file + */ +flow_file_info log_aggregate(const char * file_path, char delim, processor_context * ctx); +void on_trigger_tailfilechunk(processor_session * ps, processor_context * ctx); +void on_trigger_logaggregator(processor_session * ps, processor_context * ctx); +void on_trigger_tailfiledelimited(processor_session * ps, processor_context * ctx); +void signal_handler(int signum); +void delete_all_flow_files_from_proc(const char * uuid); +void delete_completed_flow_files_from_proc(const char * uuid); +void update_proc_params(const char * uuid, uint64_t value, flow_file_list * ff); +processor_params * get_proc_params(const char * uuid); + +void init_common_input(tailfile_input_params * input_params, char ** args); +tailfile_input_params init_logaggregate_input(char ** args); +tailfile_input_params init_tailfile_chunk_input(char ** args); + +int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num); +void setup_signal_action(); +nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char * processor_name, void(*callback)(processor_session *, processor_context *)); +void free_proc_params(const char * uuid); + +#ifdef __cplusplus +} +#endif + +#endif /* NANOFI_INCLUDE_API_ECU_H_ */ diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h index 074ae70..f401073 100644 --- a/nanofi/include/api/nanofi.h +++ b/nanofi/include/api/nanofi.h @@ -150,7 +150,7 @@ processor *add_python_processor(flow *, processor_logic* logic); * @param name the name of the processor to instanciate * @return pointer to the new processor or nullptr in case it cannot be instantiated (wrong name?) **/ -standalone_processor *create_processor(const char * name); +standalone_processor *create_processor(const char * name, nifi_instance * instance); /** * Free a standalone processor @@ -318,6 +318,13 @@ flow_file_record* create_ff_object_nc(); flow_file_record* generate_flow_file(nifi_instance * instance, standalone_processor * proc); /** + * Adds content to the flow file record. + * @param ctx the processor context + * @return a flow file record + */ +flow_file_record * generate_flow(processor_context * ctx); + +/** * Get incoming flow file. To be used in processor logic callbacks. * @param session current processor session * @param context current processor context @@ -441,6 +448,35 @@ int delete_custom_processor(const char * name); **/ int transfer_to_relationship(flow_file_record * ffr, processor_session * ps, const char * relationship); +/** + * Write content to a flow file and return a pointer to flow file record + * @param buff, the buffer to read content from + * @param count the number of bytes to read + * @param ctx the processor context + */ +flow_file_record * write_to_flow(const char * buff, size_t count, processor_context * ctx); + +/** + * Initialize content repository + * @param ctx the processor context + */ +void initialize_content_repo(processor_context * ctx, const char * uuid); + +/** + * Clear content repository contents + */ +void clear_content_repo(const nifi_instance * instance); + +/** + * Get the processor uuid from processor context + */ +void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target); + +/** + * Get the processor uuid from processor + */ +void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target); + /**** * ################################################################## * Persistence Operations diff --git a/nanofi/include/core/cstructs.h b/nanofi/include/core/cstructs.h index 3be3ce3..846be5b 100644 --- a/nanofi/include/core/cstructs.h +++ b/nanofi/include/core/cstructs.h @@ -117,9 +117,9 @@ typedef struct { char * contentLocation; /**< Filesystem location of this object */ - void *attributes; /**< Hash map of attributes */ + void * attributes; /**< Hash map of attributes */ - void *ffp; + void * ffp; uint8_t keepContent; @@ -175,16 +175,15 @@ typedef struct token_list { * ################################################################## */ -typedef struct flow_file_list_node { - flow_file_record * ff_record; - struct flow_file_list_node * next; -} flow_file_list_node; - typedef struct flow_file_list { - flow_file_list_node * head; - flow_file_list_node * tail; - int len; - int offset; + flow_file_record * ff_record; + int complete; + struct flow_file_list * next; } flow_file_list; +typedef struct flow_file_info { + struct flow_file_list * ff_list; + uint64_t total_bytes; +} flow_file_info; + #endif /* LIBMINIFI_SRC_CAPI_CSTRUCTS_H_ */ diff --git a/nanofi/include/core/file_utils.h b/nanofi/include/core/file_utils.h index 25c3029..15806e4 100644 --- a/nanofi/include/core/file_utils.h +++ b/nanofi/include/core/file_utils.h @@ -19,6 +19,7 @@ #ifndef NANOFI_INCLUDE_CORE_FILE_UTILS_H_ #define NANOFI_INCLUDE_CORE_FILE_UTILS_H_ +#include "utlist.h" #include "flowfiles.h" #ifdef __cplusplus @@ -26,14 +27,49 @@ extern "C" { #endif /** - * Tails a delimited file starting from an offset up to the end of file - * @param file the path to the file to tail - * @param delim the delimiter character - * @param curr_offset the offset in the file to tail from. - * For eg. To tail from beginning of the file curr_offset = 0 - * @return a list of tokens + * Recursively deletes a directory tree + * @param path, the path to the directory */ -token_list tail_file(const char * file, char delim, int curr_offset); +void remove_directory(const char * path); + +/** + * Determine if the provided directory/file path is a directory + * @path the absolute path to the file/directory + * @return 1 if path is directory else 0 + */ +int is_directory(const char * path); + +/* + * Get the platform-specific path separator. + * @param force_posix returns the posix path separator ('/'), even when not on posix. Useful when dealing with remote posix paths. + * @return the path separator character + */ +const char * get_separator(int force_posix); + +/** + * Joins parent path with child path + * @param parent the parent path + * @param child the child path + * @return concatenated path + * @attention this function allocates memory for the returned concatenated path + * and it is left for the caller to free the memory + */ +char * concat_path(const char * parent, const char * child); + +/** + * Make a directory tree specified by path + * @param path the path to the directory + * @return 1 if successful else 0 + */ +int make_dir(const char * path); + +/** + * Return the current working directory + * @return the current working directory + * @attention this function allocates memory on heap + * it is left to the caller to free it + */ +char * get_current_working_directory(); #ifdef __cplusplus } diff --git a/nanofi/include/core/flowfiles.h b/nanofi/include/core/flowfiles.h index 60ac02e..72307ec 100644 --- a/nanofi/include/core/flowfiles.h +++ b/nanofi/include/core/flowfiles.h @@ -19,10 +19,33 @@ #ifndef NANOFI_INCLUDE_CORE_FLOWFILES_H_ #define NANOFI_INCLUDE_CORE_FLOWFILES_H_ +#ifdef __cplusplus +extern "C" { +#endif + #include "cstructs.h" +#include "api/ecu.h" +#include "sitetosite/CPeer.h" +#include "sitetosite/CRawSocketProtocol.h" + +flow_file_list * add_flow_file_record(flow_file_list ** ff_list, flow_file_record * record); + +void free_flow_file_list(flow_file_list ** ff_list); + +void add_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset); + +void update_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset); + +void transmit_flow_files(nifi_instance * instance, flow_file_list * ff_list, int complete); + +void transmit_payload(struct CRawSiteToSiteClient * client, struct flow_file_list * ff_list, int complete); + +uint64_t flow_files_size(flow_file_list * ff_list); -void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record); +void read_payload_and_transmit(struct flow_file_list * ffl, struct CRawSiteToSiteClient * client); -void free_flow_file_list(flow_file_list * ff_list); +#ifdef __cplusplus +} +#endif #endif /* NANOFI_INCLUDE_CORE_FLOWFILES_H_ */ diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h index b10b95f..0325081 100644 --- a/nanofi/include/cxx/Instance.h +++ b/nanofi/include/cxx/Instance.h @@ -27,6 +27,7 @@ #include "RemoteProcessorGroupPort.h" #include "core/ContentRepository.h" #include "core/repository/VolatileContentRepository.h" +#include "core/repository/FileSystemRepository.h" #include "core/Repository.h" #include "C2CallbackAgent.h" @@ -40,6 +41,8 @@ #include "ReflexiveSession.h" #include "utils/ThreadPool.h" #include "core/state/UpdateController.h" +#include "core/file_utils.h" + namespace org { namespace apache { namespace nifi { @@ -63,14 +66,24 @@ class ProcessorLink { class Instance { public: - explicit Instance(const std::string &url, const std::string &port) + explicit Instance(const std::string &url, const std::string &port, const std::string &repo_class_name = "") : configure_(std::make_shared<Configure>()), url_(url), agent_(nullptr), rpgInitialized_(false), listener_thread_pool_(1), - content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()), no_op_repo_(std::make_shared<minifi::core::Repository>()) { + + if (repo_class_name == "filesystemrepository") { + content_repo_ = std::make_shared<minifi::core::repository::FileSystemRepository>(); + } else { + content_repo_ = std::make_shared<minifi::core::repository::VolatileContentRepository>(); + } + char * cwd = get_current_working_directory(); + if (cwd) { + configure_->setHome(std::string(cwd)); + free(cwd); + } running_ = false; stream_factory_ = minifi::io::StreamFactory::getInstance(configure_); utils::Identifier uuid; @@ -118,7 +131,7 @@ class Instance { return no_op_repo_; } - std::shared_ptr<minifi::core::ContentRepository> getContentRepository() { + std::shared_ptr<minifi::core::ContentRepository> getContentRepository() const { return content_repo_; } diff --git a/nanofi/src/api/ecu.c b/nanofi/src/api/ecu.c new file mode 100644 index 0000000..709c681 --- /dev/null +++ b/nanofi/src/api/ecu.c @@ -0,0 +1,530 @@ +#include "api/ecu.h" +#include "api/nanofi.h" +#include "core/string_utils.h" +#include "core/cstructs.h" +#include "core/file_utils.h" +#include "core/flowfiles.h" + +#include <unistd.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <errno.h> +#include <limits.h> +#include <signal.h> +#include <sys/stat.h> + +processor_params * procparams = NULL; +volatile sig_atomic_t stopped = 0; + +void free_proc_params(const char * uuid) { + + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (pp) { + free_flow_file_list(&pp->ff_list); + free(pp->properties->file_path); + free(pp->properties); + HASH_DEL(procparams, pp); + free(pp); + } +} + +void signal_handler(int signum) { + if (signum == SIGINT || signum == SIGTERM) { + stopped = 1; + } +} + +void init_common_input(tailfile_input_params * input_params, char ** args) { + if (args && *args) { + input_params->file = args[1]; + input_params->interval = args[2]; + input_params->instance = args[4]; + input_params->tcp_port = args[5]; + input_params->nifi_port_uuid = args[6]; + } +} + +tailfile_input_params init_logaggregate_input(char ** args) { + tailfile_input_params input_params; + memset(&input_params, 0, sizeof(input_params)); + init_common_input(&input_params, args); + input_params.delimiter = args[3]; + return input_params; +} + +tailfile_input_params init_tailfile_chunk_input(char ** args) { + tailfile_input_params input_params; + memset(&input_params, 0, sizeof(input_params)); + init_common_input(&input_params, args); + input_params.chunk_size = args[3]; + return input_params; +} + +int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num) { + if (access(params->file, F_OK) == -1) { + printf("Error: %s doesn't exist!\n", params->file); + return -1; + } + + struct stat stats; + int ret = stat(params->file, &stats); + + if (ret == -1) { + printf("Error occurred while getting file status {file: %s, error: %s}\n", params->file, strerror(errno)); + return -1; + } + // Check for file existence + if (S_ISDIR(stats.st_mode)){ + printf("Error: %s is a directory!\n", params->file); + return -1; + } + + errno = 0; + *intrvl = (uint64_t)(strtoul(params->interval, NULL, 10)); + + if (errno != 0) { + printf("Invalid interval value specified\n"); + return -1; + } + + errno = 0; + *port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10)); + if (errno != 0) { + printf("Cannot convert tcp port to numeric value\n"); + return -1; + } + return 0; +} + +void setup_signal_action() { + struct sigaction action; + memset(&action, 0, sizeof(sigaction)); + action.sa_handler = signal_handler; + sigaction(SIGTERM, &action, NULL); + sigaction(SIGINT, &action, NULL); +} + +nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char * processor_name, void(*callback)(processor_session *, processor_context *)) { + nifi_proc_params params; + nifi_port port; + port.port_id = input_params->nifi_port_uuid; + + nifi_instance * instance = create_instance(input_params->instance, &port); + add_custom_processor(processor_name, callback); + standalone_processor * proc = create_processor(processor_name, instance); + params.instance = instance; + params.processor = proc; + return params; +} + +void add_to_hash_table(flow_file_record * ffr, uint64_t offset, const char * uuid) { + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (pp == NULL) { + pp = (struct processor_params*)malloc(sizeof(struct processor_params)); + memset(pp, 0, sizeof(struct processor_params)); + strcpy(pp->uuid_str, uuid); + HASH_ADD_STR(procparams, uuid_str, pp); + } + + add_flow_file_record(&pp->ff_list, ffr); + pp->curr_offset = offset; +} + +void delete_all_flow_files_from_proc(const char * uuid) { + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (pp) { + struct flow_file_list * head = pp->ff_list; + while (head) { + struct flow_file_list * tmp = head; + free_flowfile(tmp->ff_record); + head = head->next; + free(tmp); + } + pp->ff_list = head; + } +} + +void delete_completed_flow_files_from_proc(const char * uuid) { + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (pp) { + struct flow_file_list * head = pp->ff_list; + while (head) { + struct flow_file_list * tmp = head; + if (tmp->complete) { + free_flowfile(tmp->ff_record); + head = head->next; + free(tmp); + } + else { + break; + } + } + pp->ff_list = head; + } +} + +uint64_t get_current_offset(const char * uuid) { + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (pp) { + return pp->curr_offset; + } + return 0; +} + +processor_params * get_proc_params(const char * uuid) { + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + return pp; +} + +void update_proc_params(const char * uuid, uint64_t value, flow_file_list * ffl) { + struct processor_params * pp = get_proc_params(uuid); + if (!pp) { + pp = (struct processor_params *)malloc(sizeof(struct processor_params)); + memset(pp, 0, sizeof(struct processor_params)); + pp->ff_list = ffl; + pp->curr_offset = value; + strcpy(pp->uuid_str, uuid); + HASH_ADD_STR(procparams, uuid_str, pp); + return; + } + delete_all_flow_files_from_proc(uuid); + pp->curr_offset += value; + pp->ff_list = ffl; +} + +uint64_t update_curr_offset(const char * uuid, uint64_t value) { + struct processor_params * pp = get_proc_params(uuid); + if (pp) { + pp->curr_offset += value; + return pp->curr_offset; + } + + pp = (struct processor_params *)malloc(sizeof(struct processor_params)); + memset(pp, 0, sizeof(struct processor_params)); + strcpy(pp->uuid_str, uuid); + pp->curr_offset = value; + HASH_ADD_STR(procparams, uuid_str, pp); + return pp->curr_offset; +} + +struct proc_properties * get_processor_properties(const char * uuid) { + if (!uuid) { + return NULL; + } + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (!pp) { + return NULL; + } + return pp->properties; +} + +void add_processor_properties(const char * uuid, struct proc_properties * const props) { + struct processor_params * pp = get_proc_params(uuid); + if (pp) { + pp->properties = props; + return; + } + + pp = (struct processor_params *)malloc(sizeof(struct processor_params)); + memset(pp, 0, sizeof(struct processor_params)); + strcpy(pp->uuid_str, uuid); + pp->properties = props; + HASH_ADD_STR(procparams, uuid_str, pp); +} + +void on_trigger_tailfilechunk(processor_session * ps, processor_context * ctx) { + + char uuid_str[37]; + get_proc_uuid_from_context(ctx, uuid_str); + + initialize_content_repo(ctx, uuid_str); + + struct proc_properties * props = get_processor_properties(uuid_str); + if (!props) { + char file_path[4096]; + char chunk_size[50]; + if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) { + return; + } + + if (get_property(ctx, "chunk_size", chunk_size, sizeof(chunk_size)) != 0) { + return; + } + + errno = 0; + uint64_t chunk_size_value = strtoul(chunk_size, NULL, 10); + + if (errno != 0 || chunk_size_value == 0) { + printf("Invalid chunk size specified\n"); + return; + } + + props = (struct proc_properties *)malloc(sizeof(struct proc_properties)); + memset(props, 0, sizeof(struct proc_properties)); + int len = strlen(file_path); + props->file_path = (char *)malloc((len + 1) * sizeof(char)); + strncpy(props->file_path, file_path, len); + props->file_path[len] = '\0'; + props->chunk_size = chunk_size_value; + add_processor_properties(uuid_str, props); + } + + FILE * fp = fopen(props->file_path, "rb"); + + if (!fp) { + printf("Unable to open file. {file: %s, reason: %s}\n", props->file_path, strerror(errno)); + return; + } + + char * buff = (char *)malloc((props->chunk_size +1 ) * sizeof(char)); + size_t bytes_read = 0; + + uint64_t curr_offset = get_current_offset(uuid_str); + fseek(fp, curr_offset, SEEK_SET); + while ((bytes_read = fread(buff, 1, props->chunk_size, fp)) > 0) { + if (bytes_read < props->chunk_size) { + break; + } + buff[props->chunk_size] = '\0'; + flow_file_record * ffr = write_to_flow(buff, strlen(buff), ctx); + curr_offset = ftell(fp); + add_attributes(ffr, props->file_path, curr_offset); + add_to_hash_table(ffr, curr_offset, uuid_str); + } + free(buff); + fclose(fp); +} + +flow_file_info log_aggregate(const char * file_path, char delim, processor_context * ctx) { + flow_file_info ff_info; + memset(&ff_info, 0, sizeof(ff_info)); + + if (!file_path) { + return ff_info; + } + + char uuid_str[37]; + get_proc_uuid_from_context(ctx, uuid_str); + + char buff[MAX_BYTES_READ + 1]; + errno = 0; + FILE * fp = fopen(file_path, "rb"); + if (!fp) { + printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno)); + return ff_info; + } + + uint64_t curr_offset = get_current_offset(uuid_str); + + fseek(fp, curr_offset, SEEK_SET); + + flow_file_list * ffl = NULL; + size_t bytes_read = 0; + while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) { + buff[bytes_read] = '\0'; + struct token_list tokens = tokenize_string_tailfile(buff, delim); + if (tokens.total_bytes > 0) { + ff_info.total_bytes += tokens.total_bytes; + curr_offset += tokens.total_bytes; + fseek(fp, curr_offset, SEEK_SET); + } + + token_node * head; + for (head = tokens.head; head && head->data; head = head->next) { + flow_file_record * ffr = write_to_flow(head->data, strlen(head->data), ctx); + add_attributes(ffr, file_path, curr_offset); + add_flow_file_record(&ffl, ffr); + } + free_all_tokens(&tokens); + } + fclose(fp); + ff_info.ff_list = ffl; + return ff_info; +} + +struct proc_properties * get_properties(const char * uuid, processor_context * ctx) { + struct proc_properties * props = get_processor_properties(uuid); + if (props) { + return props; + } + + char file_path[4096]; + char delimiter[3]; + + if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) { + return props; + } + + if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) { + printf("No delimiter found\n"); + return props; + } + + if (strlen(delimiter) == 0) { + printf("Delimiter not specified or it is empty\n"); + return props; + } + + props = (struct proc_properties *)malloc(sizeof(struct proc_properties)); + memset(props, 0, sizeof(struct proc_properties)); + + char delim = delimiter[0]; + + if (delim == '\\') { + if (strlen(delimiter) > 1) { + switch (delimiter[1]) { + case 'r': + delim = '\r'; + break; + case 't': + delim = '\t'; + break; + case 'n': + delim = '\n'; + break; + case '\\': + delim = '\\'; + break; + default: + break; + } + } + } + + int len = strlen(file_path); + props->file_path = (char *)malloc((len + 1) * sizeof(char)); + strncpy(props->file_path, file_path, len); + props->file_path[len] = '\0'; + props->delimiter = delim; + + add_processor_properties(uuid, props); + return props; +} + +void on_trigger_logaggregator(processor_session * ps, processor_context * ctx) { + char uuid_str[37]; + get_proc_uuid_from_context(ctx, uuid_str); + + struct proc_properties * props = get_properties(uuid_str, ctx); + + if (!props || !props->file_path) return; + + char delim = props->delimiter; + + initialize_content_repo(ctx, uuid_str); + flow_file_info ff_info = log_aggregate(props->file_path, delim, ctx); + + update_proc_params(uuid_str, ff_info.total_bytes, ff_info.ff_list); +} + +void write_flow_file(flow_file_record * ffr, const char * buff, size_t count) { + FILE * ffp = fopen(ffr->contentLocation, "ab"); + if (!ffp) return; + if (fwrite(buff, 1, count, ffp) < count) { + fclose(ffp); + free_flowfile(ffr); + return; + } + fclose(ffp); +} + +flow_file_list * get_last_flow_file(const char * uuid) { + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (!pp) { + return NULL; + } + + flow_file_list * ff_list = pp->ff_list; + flow_file_list * el = NULL; + LL_FOREACH(ff_list, el) { + if (el && !el->next) { + return el; + } + } + return NULL; +} + +flow_file_list * add_flow_file_to_proc_params(const char * uuid, flow_file_record * ffr) { + struct processor_params * pp = NULL; + HASH_FIND_STR(procparams, uuid, pp); + if (!pp) { + pp = (struct processor_params *)malloc(sizeof(struct processor_params)); + memset(pp, 0, sizeof(struct processor_params)); + strcpy(pp->uuid_str, uuid); + HASH_ADD_STR(procparams, uuid_str, pp); + } + flow_file_list * ffl_node = add_flow_file_record(&pp->ff_list, ffr); + ffl_node->complete = 0; + return ffl_node; +} + +void on_trigger_tailfiledelimited(processor_session * ps, processor_context * ctx) { + char uuid_str[37]; + get_proc_uuid_from_context(ctx, uuid_str); + + initialize_content_repo(ctx, uuid_str); + struct proc_properties * props = get_properties(uuid_str, ctx); + + if (!props || !props->file_path) return; + + char delim = props->delimiter; + + FILE * fp = fopen(props->file_path, "rb"); + + if (!fp) { + printf("Unable to open file. {file: %s, reason: %s}\n", props->file_path, strerror(errno)); + return; + } + + char buff[MAX_BYTES_READ + 1]; + size_t bytes_read = 0; + + uint64_t curr_offset = get_current_offset(uuid_str); + fseek(fp, curr_offset, SEEK_SET); + + flow_file_list * ffl_node = get_last_flow_file(uuid_str); + while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) { + buff[bytes_read] = '\0'; + const char * begin = buff; + const char * end = NULL; + + while ((end = strchr(begin, delim))) { + uint64_t len = end - begin; + if (len > 0) { + if (!ffl_node || ffl_node->complete) { + ffl_node = add_flow_file_to_proc_params(uuid_str, generate_flow(ctx)); + } + write_flow_file(ffl_node->ff_record, begin, len); + update_curr_offset(uuid_str, (len + 1)); + } + else { + update_curr_offset(uuid_str, 1); + } + if (ffl_node) { + ffl_node->complete = 1; + update_attributes(ffl_node->ff_record, props->file_path, get_current_offset(uuid_str)); + } + begin = (end + 1); + } + + if (!end && *begin != '\0') { + if (!ffl_node || ffl_node->complete) { + ffl_node = add_flow_file_to_proc_params(uuid_str, generate_flow(ctx)); + } + size_t count = strlen(begin); + write_flow_file(ffl_node->ff_record, begin, count); + update_curr_offset(uuid_str, count); + update_attributes(ffl_node->ff_record, props->file_path, get_current_offset(uuid_str)); + } + } + fclose(fp); +} diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp index d37fc6b..e8ea25a 100644 --- a/nanofi/src/api/nanofi.cpp +++ b/nanofi/src/api/nanofi.cpp @@ -113,7 +113,7 @@ nifi_instance *create_instance(const char *url, nifi_port *port) { * This API will gradually move away from C++, hence malloc is used for nifi_instance * Since minifi::Instance is currently being used, then we need to use new in that case. */ - instance->instance_ptr = new minifi::Instance(url, port->port_id); + instance->instance_ptr = new minifi::Instance(url, port->port_id, "filesystemrepository"); NULL_CHECK(nullptr, instance->instance_ptr); @@ -124,25 +124,53 @@ nifi_instance *create_instance(const char *url, nifi_port *port) { return instance; } -standalone_processor *create_processor(const char *name) { +standalone_processor * create_processor(const char *name, nifi_instance * instance) { NULL_CHECK(nullptr, name); auto ptr = ExecutionPlan::createProcessor(name, name); if (!ptr) { return nullptr; } - if (standalone_instance == nullptr) { + if (instance == NULL) { nifi_port port; char portnum[] = "98765"; port.port_id = portnum; - standalone_instance = create_instance("internal_standalone", &port); + instance = create_instance("internal_standalone", &port); } - auto flow = create_new_flow(standalone_instance); + auto flow = create_new_flow(instance); std::shared_ptr<ExecutionPlan> plan(flow); plan->addProcessor(ptr, name); ExecutionPlan::addProcessorWithPlan(ptr->getUUIDStr(), plan); return static_cast<standalone_processor*>(ptr.get()); } +void initialize_content_repo(processor_context * ctx, const char * uuid) { + if (ctx->isInitialized()) { + return; + } + char * cwd = get_current_working_directory(); + if (cwd) { + const char * sep = get_separator(0); + const std::string repo_path = std::string(cwd) + sep + "contentrepository" + sep + uuid; + ctx->initializeContentRepository(repo_path); + free(cwd); + } +} + +void clear_content_repo(const nifi_instance * instance) { + const auto content_repo = static_cast<minifi::Instance*>(instance->instance_ptr)->getContentRepository(); + const auto storage_path = content_repo->getStoragePath(); + remove_directory(storage_path.c_str()); +} + +void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target) { + strcpy(uuid_target, proc->getUUIDStr().c_str()); +} + +void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target) { + standalone_processor * proc = static_cast<standalone_processor*>(ctx->getProcessorNode()->getProcessor().get()); + get_proc_uuid_from_processor(proc, uuid_target); +} + void free_standalone_processor(standalone_processor* proc) { NULL_CHECK(, proc); ExecutionPlan::removeProcWithPlan(proc->getUUIDStr()); @@ -245,28 +273,58 @@ flow_file_record* create_ff_object_nc() { return new_ff; } -flow_file_record * generate_flow_file(nifi_instance * instance, standalone_processor * proc) { - if (!instance || !proc) { - return nullptr; - } +flow_file_record * generate_flow(processor_context * ctx) { flow_file_record * ffr = create_ff_object_nc(); - auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr); - auto content_repo = minifi_instance_ref->getContentRepository(); + if (ffr->crp) { + delete static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp); + } + ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(ctx->getContentRepository())); + + auto plan = ExecutionPlan::getPlan(ctx->getProcessorNode()->getProcessor()->getUUIDStr()); - ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(content_repo)); - auto plan = ExecutionPlan::getPlan(proc->getUUIDStr()); if (!plan) { return nullptr; } - ffr->ffp = static_cast<void*>(new std::shared_ptr<core::FlowFile>(plan->getCurrentFlowFile())); - ffr->keepContent = 1; + ffr->ffp = NULL; + ffr->keepContent = 0; auto ff_content_repo_ptr = (static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp)); auto claim = std::make_shared<minifi::ResourceClaim>(*ff_content_repo_ptr); - const char * full_path = claim->getContentFullPath().c_str(); - int len = strlen(full_path); - ffr->contentLocation = (char *) malloc(sizeof(char) * (len + 1)); - snprintf(ffr->contentLocation, len + 1, "%s", full_path); + + size_t len = strlen(claim->getContentFullPath().c_str()); + ffr->contentLocation = (char *) malloc((len + 1) * sizeof(char)); + snprintf(ffr->contentLocation, len+1, "%s", claim->getContentFullPath().c_str()); + return ffr; +} + +flow_file_record * write_to_flow(const char * buff, size_t count, processor_context * ctx) { + if (!ctx) { + return NULL; + } + + flow_file_record * ffr = generate_flow(ctx); + + if (ffr == NULL) { + printf("Could not generate flow file\n"); + return NULL; + } + + FILE * ffp = fopen(ffr->contentLocation, "wb"); + if (!ffp) { + printf("Cannot open flow file at path %s to write content to.\n", ffr->contentLocation); + free_flowfile(ffr); + return NULL; + } + + int ret = fwrite(buff, 1, count, ffp); + if (ret < count) { + fclose(ffp); + free_flowfile(ffr); + return NULL; + } + fseek(ffp, 0, SEEK_END); + ffr->size = ftell(ffp); + fclose(ffp); return ffr; } diff --git a/nanofi/src/core/file_utils.c b/nanofi/src/core/file_utils.c index 3f7b79e..1eeedc6 100644 --- a/nanofi/src/core/file_utils.c +++ b/nanofi/src/core/file_utils.c @@ -20,44 +20,122 @@ #include <stdio.h> #include <string.h> #include <errno.h> +#include <sys/stat.h> +#include <dirent.h> +#include <unistd.h> +#include <limits.h> -#include "api/nanofi.h" #include "core/string_utils.h" #include "core/file_utils.h" -token_list tail_file(const char * file_path, char delim, int curr_offset) { - token_list tkn_list; - memset(&tkn_list, 0, sizeof(struct token_list)); +#ifdef _MSC_VER +#ifndef PATH_MAX +#define PATH_MAX 260 +#endif +#endif - if (!file_path) { - return tkn_list; +int is_directory(const char * path) { + struct stat dir_stat; + if (stat(path, &dir_stat) < 0) { + return 0; } + return S_ISDIR(dir_stat.st_mode); +} + +const char * get_separator(int force_posix) +{ +#ifdef WIN32 + if (!force_posix) { + return "\\"; + } +#endif + return "/"; +} + +char * concat_path(const char * parent, const char * child) { + char * path = (char *)malloc((strlen(parent) + strlen(child) + 2) * sizeof(char)); + strcpy(path, parent); + const char * sep = get_separator(0); + strcat(path, sep); + strcat(path, child); + return path; +} + +void remove_directory(const char * dir_path) { + + if (!is_directory(dir_path)) { + if (unlink(dir_path) == -1) { + printf("Could not remove file %s\n", dir_path); + } + return; + } + + uint64_t path_len = strlen(dir_path); + struct dirent * dir; + DIR * d = opendir(dir_path); + + while ((dir = readdir(d)) != NULL) { + char * entry_name = dir->d_name; + if (!strcmp(entry_name, ".") || !strcmp(entry_name, "..")) { + continue; + } + char * path = concat_path(dir_path, entry_name); + remove_directory(path); + free(path); + } + + rmdir(dir_path); + closedir(d); +} + +int make_dir(const char * path) { + if (!path) return -1; - char buff[MAX_BYTES_READ + 1]; - memset(buff, 0, MAX_BYTES_READ+1); errno = 0; - FILE * fp = fopen(file_path, "rb"); - if (!fp) { - printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno)); - return tkn_list; - } - fseek(fp, curr_offset, SEEK_SET); - - int bytes_read = 0; - int i = 0; - while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) { - buff[bytes_read] = '\0'; - struct token_list tokens = tokenize_string_tailfile(buff, delim); - if (tokens.size > 0) { - attach_lists(&tkn_list, &tokens); + int ret = mkdir(path, S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH); + if (ret == 0) { + return 0; + } + + switch (errno) { + case ENOENT: { + char * found = strrchr(path, '/'); + if (!found) { + return -1; } - tkn_list.total_bytes += tokens.total_bytes; - if (tokens.total_bytes > 0) { - curr_offset += tokens.total_bytes; - fseek(fp, curr_offset, SEEK_SET); + int len = found - path; + char * dir = calloc(len + 1, sizeof(char)); + strncpy(dir, path, len); + dir[len] = '\0'; + int res = make_dir(dir); + free(dir); + if (res < 0) { + return -1; } - memset(buff, 0, MAX_BYTES_READ); + return mkdir(path, S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH); + } + case EEXIST: { + if (is_directory(path)) { + return 0; + } + return -1; + } + default: + return -1; + } +} + +char * get_current_working_directory() { + char * cwd = (char *)malloc(PATH_MAX * sizeof(char)); + memset(cwd, 0, PATH_MAX); + #ifdef WIN32 + if (_getcwd(cwd, PATH_MAX) != NULL) + return cwd; + #else + if (getcwd(cwd, PATH_MAX) != NULL) { + return cwd; } - fclose(fp); - return tkn_list; + #endif + free(cwd); + return NULL; } diff --git a/nanofi/src/core/flowfiles.c b/nanofi/src/core/flowfiles.c index edf2c5b..6c86e98 100644 --- a/nanofi/src/core/flowfiles.c +++ b/nanofi/src/core/flowfiles.c @@ -16,38 +16,149 @@ * limitations under the License. */ +#include "api/nanofi.h" +#include "api/ecu.h" #include "core/flowfiles.h" + +#include "utlist.h" #include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <sys/stat.h> -void add_flow_file_record(flow_file_list * ff_list, flow_file_record * record) { - if (!ff_list || !record) return; +flow_file_list * add_flow_file_record(flow_file_list ** ff_list, flow_file_record * record) { + if (!record) { + return *ff_list; + } - struct flow_file_list_node * new_node = (struct flow_file_list_node *)malloc(sizeof(struct flow_file_list_node)); + struct flow_file_list * new_node = (struct flow_file_list *)malloc(sizeof(struct flow_file_list)); new_node->ff_record = record; - new_node->next = NULL; + LL_APPEND(*ff_list, new_node); + return new_node; +} - if (!ff_list->head || !ff_list->tail) { - ff_list->head = ff_list->tail = new_node; - ff_list->len = 1; +void free_flow_file_list(flow_file_list ** ff_list) { + if (!*ff_list) { return; } + flow_file_list * head = *ff_list; + while (head) { + flow_file_list * tmp = head; + free_flowfile(tmp->ff_record); + head = head->next; + free(tmp); + } +} + +void add_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset) { + char offset_str[21]; + snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset); + add_attribute(ffr, "current offset", offset_str, strlen(offset_str)); + char content_location[strlen(ffr->contentLocation) + 1]; + snprintf(content_location, sizeof(content_location), "%s", ffr->contentLocation); + add_attribute(ffr, "content location", content_location, strlen(content_location)); + add_attribute(ffr, "tailfile path", (char*)file_path, strlen(file_path)); +} + +void update_attributes(flow_file_record * ffr, const char * file_path, uint64_t curr_offset) { + char offset_str[21]; + snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset); + update_attribute(ffr, "current offset", offset_str, strlen(offset_str)); + char content_location[strlen(ffr->contentLocation) + 1]; + snprintf(content_location, sizeof(content_location), "%s", ffr->contentLocation); + update_attribute(ffr, "content location", content_location, strlen(content_location)); + update_attribute(ffr, "tailfile path", (char*)file_path, strlen(file_path)); +} - ff_list->tail->next = new_node; - ff_list->tail = new_node; - ff_list->len++; +void transmit_flow_files(nifi_instance * instance, flow_file_list * ff_list, int complete) { + if (!instance || !ff_list) { + return; + } + flow_file_list * el = NULL; + LL_FOREACH(ff_list, el) { + if (!complete || el->complete) { + transmit_flowfile(el->ff_record, instance); + } + } } -void free_flow_file_list(flow_file_list * ff_list) { - if (!ff_list || !ff_list->head) { +void read_payload_and_transmit(struct flow_file_list * ffl, struct CRawSiteToSiteClient * client) { + if (!ffl || !client) { return; } - flow_file_list_node * head = ff_list->head; - while (head) { - free_flowfile(head->ff_record); - flow_file_list_node * tmp = head; - head = head->next; - free(tmp); + char * file = ffl->ff_record->contentLocation; + FILE * fp = fopen(file, "rb"); + if (!fp) { + return; + } + + struct stat statfs; + if (stat(file, &statfs) < 0) { + return; } - memset(ff_list, 0, sizeof(struct flow_file_list)); + size_t file_size = statfs.st_size; + + attribute attr; + attr.key = "current offset"; + if (get_attribute(ffl->ff_record, &attr) < 0) { + printf("Error looking up flow file attribute %s\n", attr.key); + return; + } + + errno = 0; + uint64_t offset = strtoull((const char *)attr.value, NULL, 10); + if (errno != 0) { + printf("Error converting flow file offset value\n"); + return; + } + uint64_t begin_offset = offset - file_size; + char * buff = (char *)malloc(sizeof(char) * 4097); + size_t count = 0; + while ((count = fread(buff, 1, 4096, fp)) > 0) { + buff[count] = '\0'; + begin_offset += count; + char offset_str[21]; + snprintf(offset_str, sizeof(offset_str), "%llu", begin_offset); + update_attribute(ffl->ff_record, "current offset", offset_str, strlen(offset_str)); + + attribute_set as; + uint64_t num_attrs = get_attribute_quantity(ffl->ff_record); + as.size = num_attrs; + as.attributes = (attribute *)malloc(num_attrs * sizeof(attribute)); + get_all_attributes(ffl->ff_record, &as); + + if (transmitPayload(client, buff, &as) == 0) { + printf("payload of %zu bytes from %s sent successfully\n", count, ffl->ff_record->contentLocation); + } + else { + printf("Failed to send payload, flow file %s\n", ffl->ff_record->contentLocation); + } + free(as.attributes); + } + free(buff); + fclose(fp); +} + +void transmit_payload(struct CRawSiteToSiteClient * client, struct flow_file_list * ff_list, int complete) { + if (!client || !ff_list) { + return; + } + flow_file_list * el = NULL; + LL_FOREACH(ff_list, el) { + if (!complete || el->complete) { + read_payload_and_transmit(el, client); + } + } +} + +uint64_t flow_files_size(flow_file_list * ff_list) { + if (!ff_list) { + return 0; + } + + uint64_t counter = 0; + flow_file_list * el = NULL; + LL_COUNT(ff_list, el, counter); + return counter; } diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp index 9a63c4a..769b1dc 100644 --- a/nanofi/tests/CAPITests.cpp +++ b/nanofi/tests/CAPITests.cpp @@ -318,7 +318,7 @@ TEST_CASE("Test standalone processors", "[testStandalone]") { create_testfile_for_getfile(sourcedir.c_str()); - standalone_processor* getfile_proc = create_processor("GetFile"); + standalone_processor* getfile_proc = create_processor("GetFile", NULL); REQUIRE(set_standalone_property(getfile_proc, "Input Directory", sourcedir.c_str()) == 0); flow_file_record* ffr = invoke(getfile_proc); @@ -326,7 +326,7 @@ TEST_CASE("Test standalone processors", "[testStandalone]") { REQUIRE(ffr != nullptr); REQUIRE(get_attribute_quantity(ffr) > 0); - standalone_processor* extract_test = create_processor("ExtractText"); + standalone_processor* extract_test = create_processor("ExtractText", NULL); REQUIRE(extract_test != nullptr); REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0); @@ -379,7 +379,7 @@ TEST_CASE("Test interaction of flow and standlone processors", "[testStandaloneW flow_file_record *record = get_next_flow_file(instance, test_flow); REQUIRE(record != nullptr); - standalone_processor* putfile_proc = create_processor("PutFile"); + standalone_processor* putfile_proc = create_processor("PutFile", NULL); REQUIRE(set_standalone_property(putfile_proc, "Directory", putfiledir.c_str()) == 0); flow_file_record* put_record = invoke_ff(putfile_proc, record); @@ -409,7 +409,7 @@ TEST_CASE("Test standalone processors with file input", "[testStandaloneWithFile auto sourcedir = testController.createTempDirectory(src_format); std::string path = create_testfile_for_getfile(sourcedir.c_str()); - standalone_processor* extract_test = create_processor("ExtractText"); + standalone_processor* extract_test = create_processor("ExtractText", NULL); REQUIRE(extract_test != nullptr); REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0); @@ -465,9 +465,9 @@ TEST_CASE("C API robustness test", "[TestRobustness]") { free_standalone_processor(nullptr); free_instance(nullptr); - REQUIRE(create_processor(nullptr) == nullptr); + REQUIRE(create_processor(nullptr, nullptr) == nullptr); - standalone_processor *standalone_proc = create_processor("GetFile"); + standalone_processor *standalone_proc = create_processor("GetFile", NULL); REQUIRE(standalone_proc != nullptr); REQUIRE(set_property(nullptr, "prop_name", "prop_value") == -1); diff --git a/nanofi/tests/CTailFileTests.cpp b/nanofi/tests/CLogAggregatorTests.cpp similarity index 69% rename from nanofi/tests/CTailFileTests.cpp rename to nanofi/tests/CLogAggregatorTests.cpp index 491e8c0..edf62c9 100644 --- a/nanofi/tests/CTailFileTests.cpp +++ b/nanofi/tests/CLogAggregatorTests.cpp @@ -16,20 +16,20 @@ * limitations under the License. */ +#ifndef _WIN32 #include "catch.hpp" #include <vector> #include <string> -#include <fstream> #include <numeric> #include <algorithm> -#include <assert.h> #include <unistd.h> #include <string.h> #include <sys/stat.h> #include "core/string_utils.h" #include "core/file_utils.h" +#include "CTestsBase.h" void test_lists_equal(token_list * tknlist, const std::vector<std::string>& sv) { REQUIRE(tknlist != NULL); @@ -174,131 +174,101 @@ TEST_CASE("Test string tokenizer for string starting and ending with delimited c * ################################################################## */ -class FileManager { -public: - FileManager(const std::string& filePath) { - assert(!filePath.empty() && "filePath provided cannot be empty!"); - filePath_ = filePath; - outputStream_.open(filePath_, std::ios::binary); - } - - ~FileManager() { - std::ifstream ifs(filePath_); - if (ifs.good()) { - remove(filePath_.c_str()); - } - } - - void Write(const std::string& str) { - outputStream_ << str; - } - - std::string WriteNChars(uint64_t n, char c) { - std::string s(n, c); - outputStream_ << s; - return s; - } - - std::string getFilePath() const { - return filePath_; - } - - void CloseStream() { - outputStream_.flush(); - outputStream_.close(); - } - - uint64_t GetFileSize() { - CloseStream(); - struct stat buff; - if (stat(filePath_.c_str(), &buff) == 0) { - return buff.st_size; - } - return 0; - } - -private: - std::string filePath_; - std::ofstream outputStream_; -}; - -TEST_CASE("Simple tail file test", "[testTailFile]") { +TEST_CASE("Simple log aggregator test", "[testLogAggregator]") { + const char * content = "hello world"; FileManager fm("test.txt"); - fm.Write("hello world"); + fm.Write(content); fm.CloseStream(); - const char * file = fm.getFilePath().c_str(); - struct token_list tkn_list = tail_file(file, ';', 0); - REQUIRE(tkn_list.size == 0); - REQUIRE(tkn_list.head == NULL); - REQUIRE(tkn_list.total_bytes == 0); + TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator); + struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str()); + + REQUIRE(pp != NULL); + REQUIRE(pp->curr_offset == 0); + REQUIRE(flow_files_size(pp->ff_list) == 0); } -TEST_CASE("Empty file tail test", "[testEmptyFileTail]") { +TEST_CASE("Empty file log aggregator test", "[testEmptyFileLogAggregator]") { FileManager fm("test.txt"); fm.CloseStream(); - const char * file = fm.getFilePath().c_str(); - struct token_list tkn_list = tail_file(file, ';', 0); - REQUIRE(tkn_list.size == 0); - REQUIRE(tkn_list.head == NULL); - REQUIRE(tkn_list.total_bytes == 0); + TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator); + struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str()); + + REQUIRE(pp != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 0); + REQUIRE(pp->ff_list == NULL); + REQUIRE(pp->curr_offset == 0); } -TEST_CASE("File containing only delimiters tail test", "[testDelimiterOnlyFileTail]") { +TEST_CASE("File containing only delimiters test", "[testDelimiterOnlyLogAggregator]") { FileManager fm("test.txt"); - fm.Write("----"); + fm.Write(";;;;"); fm.CloseStream(); - const char * file = fm.getFilePath().c_str(); - struct token_list tkn_list = tail_file(file, '-', 0); - REQUIRE(tkn_list.size == 0); - REQUIRE(tkn_list.head == NULL); - REQUIRE(tkn_list.total_bytes == 4); + TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator); + struct processor_params * pp = invoke_processor(mgr, fm.getFilePath().c_str()); + + REQUIRE(pp != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 0); + REQUIRE(pp->ff_list == NULL); + REQUIRE(pp->curr_offset == 4); } -TEST_CASE("File tail test string starting with delimiter", "[testDelimiterOnlyFileTail]") { +TEST_CASE("File containing string starting with delimiter", "[testDelimiterStartingStrings]") { FileManager fm("test.txt"); - fm.Write("----hello"); + fm.Write(";;;;hello"); fm.CloseStream(); - const char * file = fm.getFilePath().c_str(); - struct token_list tkn_list = tail_file(file, '-', 0); - REQUIRE(tkn_list.size == 0); - REQUIRE(tkn_list.head == NULL); - REQUIRE(tkn_list.total_bytes == 4); + TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator); + auto pp = invoke_processor(mgr, fm.getFilePath().c_str()); + + REQUIRE(flow_files_size(pp->ff_list) == 0); + REQUIRE(pp->ff_list == NULL); + REQUIRE(pp->curr_offset == 4); } -TEST_CASE("Test tail file with less than 4096 delimited chars", "[testTailFileDelimitedString]") { +TEST_CASE("Test tail file with less than 4096 delimited chars", "[testLogAggregateFileLessThan4KB]") { + const std::string token1("token1"); + const std::string token2("token2"); + const std::string token3("token3"); + std::vector<std::string> tokens = {token1, token2, token3}; - const std::string delimitedString = "token1--token2--token3"; + const std::string delimitedString = join_strings(tokens, ";;"); FileManager fm("test.txt"); fm.Write(delimitedString); const std::string filePath = fm.getFilePath(); fm.CloseStream(); - struct token_list tokens = tail_file(filePath.c_str(), '-', 0); - test_lists_equal(&tokens, std::vector<std::string>{"token1", "token2"}); + TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator); + auto pp = invoke_processor(mgr, filePath.c_str()); + + REQUIRE(pp->curr_offset == (token1.size() + token2.size() + (2 * std::string("--").size()))); + REQUIRE(pp->ff_list != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 2); } // Although there is no delimiter within the string that is at least 4096 bytes long, // tail_file still creates a flow file for the first 4096 bytes. -TEST_CASE("Test tail file having 4096 bytes without delimiter", "[testTailFile4096Chars]") { +TEST_CASE("Test tail file having 4096 bytes without delimiter", "[testLogAggregateFile4096Chars]") { FileManager fm("test.txt"); const std::string s = std::move(fm.WriteNChars(4096, 'a')); const std::string filePath = fm.getFilePath(); fm.CloseStream(); - struct token_list tokens = tail_file(filePath.c_str(), '-', 0); - test_lists_equal(&tokens, std::vector<std::string>{std::move(s)}); + TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator); + auto pp = invoke_processor(mgr, filePath.c_str()); + REQUIRE(pp->curr_offset == 4096); + REQUIRE(pp->ff_list != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 1); } // Although there is no delimiter within the string that is equal to 4096 bytes or longer // tail_file creates a flow file for each subsequent 4096 byte chunk. It leaves the last chunk // if it is smaller than 4096 bytes and not delimited -TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[testTailFileMoreThan4096Chars]") { +TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[testLogAggregarteFileMoreThan4096Chars]") { FileManager fm("test.txt"); const std::string s1 = std::move(fm.WriteNChars(4096, 'a')); @@ -307,63 +277,89 @@ TEST_CASE("Test tail file having more than 4096 bytes without delimiter", "[test fm.Write(s3); fm.CloseStream(); + TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator); const uint64_t totalStringsSize = s1.size() + s2.size() + s3.size(); const std::string filePath = fm.getFilePath(); const uint64_t bytesWrittenToStream = fm.GetFileSize(); REQUIRE(bytesWrittenToStream == totalStringsSize); - struct token_list tokens = tail_file(filePath.c_str(), '-', 0); - test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)}); + auto pp = invoke_processor(mgr, filePath.c_str()); + REQUIRE(flow_files_size(pp->ff_list) == 2); + flow_file_list * el = NULL; + LL_FOREACH(pp->ff_list, el) { + REQUIRE(el->ff_record->size == 4096); + } + REQUIRE(pp->curr_offset == (s1.size() + s2.size())); } -TEST_CASE("Test tail file having more than 4096 bytes with delimiter", "[testTailFileWithDelimitedString]") { +TEST_CASE("Test tail file having more than 4096 bytes with delimiter", "[testLogAggregateWithDelimitedString]") { FileManager fm("test.txt"); const std::string s1 = std::move(fm.WriteNChars(4096, 'a')); - const std::string d1 = std::move(fm.WriteNChars(2, '-')); + const std::string d1 = std::move(fm.WriteNChars(2, ';')); const std::string s2 = std::move(fm.WriteNChars(4096, 'b')); fm.CloseStream(); + TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator); const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size(); const std::string filePath = fm.getFilePath(); const uint64_t bytesWrittenToStream = fm.GetFileSize(); REQUIRE(bytesWrittenToStream == totalStringsSize); - struct token_list tokens = tail_file(filePath.c_str(), '-', 0); - test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s2)}); + auto pp = invoke_processor(mgr, filePath.c_str()); + REQUIRE(flow_files_size(pp->ff_list) == 2); + REQUIRE(pp->curr_offset == totalStringsSize); + flow_file_list * el = NULL; + LL_FOREACH(pp->ff_list, el) { + REQUIRE(el->ff_record->size == 4096); + } } -TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk less than 4096", "[testTailFileWithDelimitedString]") { +TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk less than 4096", "[testLogAggregateDelimited]") { FileManager fm("test.txt"); const std::string s1 = std::move(fm.WriteNChars(4096, 'a')); - const std::string d1 = std::move(fm.WriteNChars(2, '-')); + const std::string d1 = std::move(fm.WriteNChars(2, ';')); const std::string s2 = std::move(fm.WriteNChars(4000, 'b')); fm.CloseStream(); + TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator); const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size(); const std::string filePath = fm.getFilePath(); const uint64_t bytesWrittenToStream = fm.GetFileSize(); REQUIRE(bytesWrittenToStream == totalStringsSize); - struct token_list tokens = tail_file(filePath.c_str(), '-', 0); - test_lists_equal(&tokens, std::vector<std::string>{std::move(s1)}); + auto pp = invoke_processor(mgr, filePath.c_str()); + REQUIRE(pp->curr_offset == (s1.size() + d1.size())); + REQUIRE(flow_files_size(pp->ff_list) == 1); + + flow_file_list * el = NULL; + LL_FOREACH(pp->ff_list, el) { + REQUIRE(el->ff_record->size == 4096); + } } -TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk more than 4096", "[testTailFileWithDelimitedString]") { +TEST_CASE("Test tail file having more than 4096 bytes with delimiter and second chunk more than 4096", "[testLogAggregateDelimited]") { FileManager fm("test.txt"); const std::string s1 = std::move(fm.WriteNChars(4096, 'a')); - const std::string d1 = std::move(fm.WriteNChars(2, '-')); + const std::string d1 = std::move(fm.WriteNChars(2, ';')); const std::string s2 = std::move(fm.WriteNChars(4098, 'b')); fm.CloseStream(); + TailFileTestResourceManager mgr("LogAggregator", on_trigger_logaggregator); const uint64_t totalStringsSize = s1.size() + s2.size() + d1.size(); const std::string filePath = fm.getFilePath(); const uint64_t bytesWrittenToStream = fm.GetFileSize(); REQUIRE(bytesWrittenToStream == totalStringsSize); - const std::string s3 = std::string(s2.data(), s2.data()+4096); - struct token_list tokens = tail_file(filePath.c_str(), '-', 0); - test_lists_equal(&tokens, std::vector<std::string>{std::move(s1), std::move(s3)}); + auto pp = invoke_processor(mgr, filePath.c_str()); + REQUIRE(pp->curr_offset == 8194); + REQUIRE(flow_files_size(pp->ff_list) == 2); + + flow_file_list * el = NULL; + LL_FOREACH(pp->ff_list, el) { + REQUIRE(el->ff_record->size == 4096); + } } +#endif diff --git a/nanofi/tests/CTailFileChunkTests.cpp b/nanofi/tests/CTailFileChunkTests.cpp new file mode 100644 index 0000000..ff933c8 --- /dev/null +++ b/nanofi/tests/CTailFileChunkTests.cpp @@ -0,0 +1,135 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _WIN32 +#include "catch.hpp" + +#include <dirent.h> +#include <stdio.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include "core/string_utils.h" +#include "core/file_utils.h" + +#include "CTestsBase.h" + +/**** + * ################################################################## + * CTAILFILE CHUNK TESTS + * ################################################################## + */ + +TEST_CASE("Test tailfile chunk size 4096, file size 8KB", "[tailfileChunk8KBFileSize]") { + + TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk); + const char * file = "./e.txt"; + const char * chunksize = "4096"; + + //Write 8192 bytes to the file + FileManager fm(file); + fm.WriteNChars(4096, 'a'); + fm.WriteNChars(4096, 'b'); + fm.CloseStream(); + + standalone_processor * proc = mgr.getProcessor(); + set_standalone_property(proc, "file_path", file); + set_standalone_property(proc, "chunk_size", chunksize); + + flow_file_record * new_ff = invoke(proc); + + char uuid_str[37]; + get_proc_uuid_from_processor(proc, uuid_str); + struct processor_params * pp = get_proc_params(uuid_str); + + //Test that two flow file records were created + REQUIRE(pp != NULL); + REQUIRE(pp->ff_list != NULL); + REQUIRE(pp->ff_list->ff_record != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 2); + + //Test that the current offset in the file is 8192 bytes + REQUIRE(pp->curr_offset == 8192); +} + +TEST_CASE("Test tailfile chunk size 4096, file size less than 8KB", "[tailfileChunkFileSizeLessThan8KB]") { + + TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk); + const char * file = "./e.txt"; + const char * chunksize = "4096"; + + //Write 4505 bytes to the file + FileManager fm(file); + fm.WriteNChars(4096, 'a'); + fm.WriteNChars(409, 'b'); + fm.CloseStream(); + + standalone_processor * proc = mgr.getProcessor(); + set_standalone_property(proc, "file_path", file); + set_standalone_property(proc, "chunk_size", chunksize); + + flow_file_record * new_ff = invoke(proc); + + char uuid_str[37]; + get_proc_uuid_from_processor(proc, uuid_str); + struct processor_params * pp = get_proc_params(uuid_str); + //Test that one flow file record was created + REQUIRE(pp != NULL); + REQUIRE(pp->ff_list != NULL); + REQUIRE(pp->ff_list->ff_record != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 1); + + //Test that the current offset in the file is 4096 bytes + REQUIRE(pp->curr_offset == 4096); + REQUIRE(pp->ff_list->ff_record->size == 4096); + + struct stat fstat; + REQUIRE(stat(pp->ff_list->ff_record->contentLocation, &fstat) == 0); + REQUIRE(fstat.st_size == 4096); +} + +TEST_CASE("Test tailfile chunk size 512, file size equal to 4608B", "[tailfileChunkFileSize8KB]") { + + TailFileTestResourceManager mgr("TailFileChunk", on_trigger_tailfilechunk); + const char * file = "./e.txt"; + const char * chunksize = "512"; + + //Write 4608 bytes to the file + FileManager fm(file); + fm.WriteNChars(4608, 'a'); + fm.CloseStream(); + + standalone_processor * proc = mgr.getProcessor(); + set_standalone_property(proc, "file_path", file); + set_standalone_property(proc, "chunk_size", chunksize); + + flow_file_record * new_ff = invoke(proc); + + char uuid_str[37]; + get_proc_uuid_from_processor(proc, uuid_str); + struct processor_params * pp = get_proc_params(uuid_str); + + //Test that one flow file record was created + REQUIRE(pp != NULL); + REQUIRE(pp->ff_list != NULL); + REQUIRE(pp->ff_list->ff_record != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 9); + + //Test that the current offset in the file is 4608 bytes + REQUIRE(pp->curr_offset == 4608); +} +#endif diff --git a/nanofi/tests/CTailFileDelimitedTests.cpp b/nanofi/tests/CTailFileDelimitedTests.cpp new file mode 100644 index 0000000..ed079d1 --- /dev/null +++ b/nanofi/tests/CTailFileDelimitedTests.cpp @@ -0,0 +1,256 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "catch.hpp" + +#include "CTestsBase.h" + +/**** + * ################################################################## + * CTAILFILE DELIMITED TESTS + * ################################################################## + */ + +TEST_CASE("Test tailfile delimited. Empty file", "[tailfileDelimitedEmptyFileTest]") { + + TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited); + const char * file = "./e.txt"; + const char * delimiter = ";"; + + //Create empty file + FileManager fm(file); + + auto pp = invoke_processor(mgr, file); + + //Test that no flowfiles were created + REQUIRE(pp != NULL); + REQUIRE(pp->ff_list == NULL); +} + +TEST_CASE("Test tailfile delimited. File has less than 4096 chars", "[tailfileDelimitedLessThan4096Chars]") { + + TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited); + const char * file = "./e.txt"; + const char * delimiter = ";"; + + FileManager fm(file); + fm.WriteNChars(34, 'a'); + fm.CloseStream(); + + auto pp = invoke_processor(mgr, file); + + //No flow files will be created + REQUIRE(pp != NULL); + REQUIRE(pp->ff_list != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 1); + REQUIRE(pp->ff_list->complete == 0); + + //Test that the current offset in the file is 34 + REQUIRE(pp->curr_offset == 34); +} + +TEST_CASE("Test tailfile delimited. Simple test", "[tailfileDelimitedSimpleTest]") { + + TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited); + const char * file = "./e.txt"; + const char * delimiter = ";"; + + //Write 8192 bytes to the file + FileManager fm(file); + fm.WriteNChars(34, 'a'); + fm.WriteNChars(1, ';'); + fm.WriteNChars(6, 'b'); + fm.WriteNChars(1, ';'); + fm.CloseStream(); + + auto pp = invoke_processor(mgr, file); + + //Test that two flow file records were created + REQUIRE(pp != NULL); + REQUIRE(pp->ff_list != NULL); + REQUIRE(pp->ff_list->ff_record != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 2); + + //Test that the current offset in the file is 42 bytes + REQUIRE(pp->curr_offset == 42); + + //Test the flow file sizes + const char * flowfile1_path = pp->ff_list->ff_record->contentLocation; + const char * flowfile2_path = pp->ff_list->next->ff_record->contentLocation; + + struct stat fstat; + stat(flowfile1_path, &fstat); + REQUIRE(fstat.st_size == 34); + + stat(flowfile2_path, &fstat); + REQUIRE(fstat.st_size == 6); + + REQUIRE(pp->ff_list->complete == 1); + REQUIRE(pp->ff_list->next->complete == 1); +} + +TEST_CASE("Test tailfile delimited. trailing non delimited string", "[tailfileNonDelimitedTest]") { + + TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited); + const char * file = "./e.txt"; + const char * delimiter = ";"; + + //Write 8192 bytes to the file + FileManager fm(file); + fm.WriteNChars(34, 'a'); + fm.WriteNChars(1, ';'); + fm.WriteNChars(32, 'b'); + fm.CloseStream(); + + auto pp = invoke_processor(mgr, file); + + //Test that two flow file records were created + REQUIRE(pp != NULL); + REQUIRE(pp->ff_list != NULL); + REQUIRE(pp->ff_list->ff_record != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 2); + + //Test that the current offset in the file is 35 bytes + REQUIRE(pp->curr_offset == 67); + REQUIRE(pp->ff_list->complete == 1); + REQUIRE(pp->ff_list->next->complete == 0); + struct stat fstat; + stat(pp->ff_list->ff_record->contentLocation, &fstat); + REQUIRE(fstat.st_size == 34); + + //Append a delimiter at the end of the file + fm.OpenStream(); + fm.WriteNChars(1, ';'); + fm.CloseStream(); + + pp = invoke_processor(mgr, file); + REQUIRE(pp != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 2); + + stat(pp->ff_list->next->ff_record->contentLocation, &fstat); + REQUIRE(fstat.st_size == 32); + REQUIRE(pp->ff_list->next->complete == 1); +} + +TEST_CASE("Test tailfile delimited 4096 chars non delimited", "[tailfileDelimitedSimpleTest]") { + + TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited); + const char * file = "./e.txt"; + const char * delimiter = ";"; + + //Write 4096 bytes to the file + FileManager fm(file); + fm.WriteNChars(4096, 'a'); + fm.CloseStream(); + + auto pp = invoke_processor(mgr, file); + + REQUIRE(pp != NULL); + REQUIRE(pp->ff_list != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 1); + REQUIRE(pp->ff_list->complete == 0); + //Test that the current offset in the file is 4096 bytes + REQUIRE(pp->curr_offset == 4096); + + //Write another 2048 characters + fm.OpenStream(); + fm.WriteNChars(2048, 'b'); + fm.CloseStream(); + + pp = invoke_processor(mgr, file); + + REQUIRE(pp->ff_list != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 1); + REQUIRE(pp->ff_list->complete == 0); + + //Test that the current offset in the file is (4096 + 2048) + REQUIRE(pp->curr_offset == 6144); + + //Write another 2048 characters + fm.OpenStream(); + fm.WriteNChars(2048, 'c'); + fm.CloseStream(); + + pp = invoke_processor(mgr, file); + + REQUIRE(pp->ff_list != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 1); + + //Test that the current offset in the file is 8192 bytes only + REQUIRE(pp->curr_offset == 8192); + + //Write a delimiter at the end and expect a flow file size of 8192 bytes + fm.OpenStream(); + fm.WriteNChars(1, ';'); + fm.CloseStream(); + + pp = invoke_processor(mgr, file); + + REQUIRE(pp->ff_list != NULL); + REQUIRE(pp->ff_list->ff_record != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 1); + REQUIRE(pp->ff_list->complete == 1); + const char * flowfile_path = pp->ff_list->ff_record->contentLocation; + struct stat fstat; + stat(flowfile_path, &fstat); + REQUIRE(fstat.st_size == 8192); +} + +TEST_CASE("Test tailfile delimited. string starting with delimiter", "[tailfileDelimiterStartStringTest]") { + + TailFileTestResourceManager mgr("TailFileDelimited", on_trigger_tailfiledelimited); + const char * file = "./e.txt"; + const char * delimiter = ";"; + + //Write 8192 bytes to the file + FileManager fm(file); + fm.WriteNChars(5, ';'); + fm.WriteNChars(34, 'a'); + fm.WriteNChars(4, ';'); + fm.WriteNChars(32, 'b'); + fm.CloseStream(); + + auto pp = invoke_processor(mgr, file); + + //Test that two flow file records were created + REQUIRE(pp != NULL); + REQUIRE(pp->ff_list != NULL); + REQUIRE(pp->ff_list->ff_record != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 2); + + //Test that the current offset in the file is 35 bytes + REQUIRE(pp->curr_offset == 75); + REQUIRE(pp->ff_list->complete == 1); + REQUIRE(pp->ff_list->next->complete == 0); + struct stat fstat; + stat(pp->ff_list->ff_record->contentLocation, &fstat); + REQUIRE(fstat.st_size == 34); + + //Append a delimiter at the end of the file + fm.OpenStream(); + fm.WriteNChars(1, ';'); + fm.CloseStream(); + + pp = invoke_processor(mgr, file); + REQUIRE(pp != NULL); + REQUIRE(flow_files_size(pp->ff_list) == 2); + + stat(pp->ff_list->next->ff_record->contentLocation, &fstat); + REQUIRE(fstat.st_size == 32); + REQUIRE(pp->ff_list->next->complete == 1); +} diff --git a/nanofi/tests/CTestsBase.h b/nanofi/tests/CTestsBase.h new file mode 100644 index 0000000..bcde416 --- /dev/null +++ b/nanofi/tests/CTestsBase.h @@ -0,0 +1,141 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _WIN32 +#ifndef NANOFI_TESTS_CTESTSBASE_H_ +#define NANOFI_TESTS_CTESTSBASE_H_ + +#include <vector> +#include <string> +#include <fstream> +#include <assert.h> +#include <sys/stat.h> + +#include "core/file_utils.h" +#include "api/ecu.h" +#include "api/nanofi.h" + +class FileManager { +public: + FileManager(const std::string& filePath) { + assert(!filePath.empty() && "filePath provided cannot be empty!"); + struct stat statbuff; + assert(!is_directory(filePath.c_str()) && "Provided file is not a filepath"); + filePath_ = filePath; + remove(filePath_.c_str()); + outputStream_.open(filePath_, std::ios::binary); + } + + ~FileManager() { + std::ifstream ifs(filePath_); + if (ifs.good()) { + remove(filePath_.c_str()); + } + } + + void Write(const std::string& str) { + outputStream_ << str; + } + + std::string WriteNChars(uint64_t n, char c) { + std::string s(n, c); + outputStream_ << s; + return s; + } + + std::string getFilePath() const { + return filePath_; + } + + void OpenStream() { + outputStream_.open(filePath_, std::ios::binary|std::ios::app); + } + + void CloseStream() { + outputStream_.flush(); + outputStream_.close(); + } + + uint64_t GetFileSize() { + CloseStream(); + struct stat buff; + if (stat(filePath_.c_str(), &buff) == 0) { + return buff.st_size; + } + return 0; + } + +private: + std::string filePath_; + std::ofstream outputStream_; +}; + +class TailFileTestResourceManager { +public: + TailFileTestResourceManager(const std::string& processor_name, void(*callback)(processor_session * ps, processor_context * ctx)) { + const char * port_str = "uuid"; + nifi_port port; + port.port_id = (char *)port_str; + const char * instance_str = "nifi"; + instance_ = create_instance(instance_str, &port); + add_custom_processor(processor_name.c_str(), callback); + processor_ = create_processor(processor_name.c_str(), instance_); + } + + ~TailFileTestResourceManager() { + remove_directory("./contentrepository"); + char uuid_str[37]; + get_proc_uuid_from_processor(processor_, uuid_str); + delete_all_flow_files_from_proc(uuid_str); + struct processor_params * tmp, * pp = NULL; + HASH_ITER(hh, procparams, pp, tmp) { + HASH_DEL(procparams, pp); + free(pp); + } + free_standalone_processor(processor_); + free_instance(instance_); + } + + standalone_processor * getProcessor() const { + return processor_; + } + + nifi_instance * getInstance() const { + return instance_; + } + +private: + nifi_instance * instance_; + standalone_processor * processor_; +}; + +struct processor_params * invoke_processor(TailFileTestResourceManager& mgr, const char * filePath) { + standalone_processor * proc = mgr.getProcessor(); + set_standalone_property(proc, "file_path", filePath); + set_standalone_property(proc, "delimiter", ";"); + + flow_file_record * new_ff = invoke(proc); + + char uuid_str[37]; + get_proc_uuid_from_processor(proc, uuid_str); + struct processor_params * pp = get_proc_params(uuid_str); + return pp; +} + +#endif /* NANOFI_TESTS_CTESTSBASE_H_ */ +#endif
