http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/utils/StringUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h index 82459db..bf802e2 100644 --- a/libminifi/include/utils/StringUtils.h +++ b/libminifi/include/utils/StringUtils.h @@ -35,96 +35,97 @@ namespace utils { * Purpose: Houses many useful string utilities. */ class StringUtils { -public: - /** - * Converts a string to a boolean - * Better handles mixed case. - * @param input input string - * @param output output string. - */ - static bool StringToBool(std::string input, bool &output) { - - std::transform(input.begin(), input.end(), input.begin(), ::tolower); - std::istringstream(input) >> std::boolalpha >> output; - return output; - } - - // Trim String utils - - /** - * Trims a string left to right - * @param s incoming string - * @returns modified string - */ - static std::string trim(std::string s) { - return trimRight(trimLeft(s)); - } - - /** - * Trims left most part of a string - * @param s incoming string - * @returns modified string - */ - static inline std::string trimLeft(std::string s) { - s.erase(s.begin(), - std::find_if(s.begin(), s.end(), - std::not1( - std::pointer_to_unary_function<int, int>( - std::isspace)))); - return s; - } - - /** - * Trims a string on the right - * @param s incoming string - * @returns modified string - */ - - static inline std::string trimRight(std::string s) { - s.erase( - std::find_if(s.rbegin(), s.rend(), - std::not1( - std::pointer_to_unary_function<int, int>( - std::isspace))).base(), s.end()); - return s; - } - - /** - * Converts a string to a float - * @param input input string - * @param output output float - * @param cp failure policy - */ - static bool StringToFloat(std::string input, float &output, - FailurePolicy cp = RETURN) { - try { - output = std::stof(input); - } catch (const std::invalid_argument &ie) { - switch (cp) { - case RETURN: - case NOTHING: - return false; - case EXIT: - exit(1); - case EXCEPT: - throw ie; - } - } catch (const std::out_of_range &ofr) { - switch (cp) { - case RETURN: - case NOTHING: - return false; - case EXIT: - exit(1); - case EXCEPT: - throw ofr; - - } - } - - return true; - - } + public: + /** + * Converts a string to a boolean + * Better handles mixed case. + * @param input input string + * @param output output string. + */ + static bool StringToBool(std::string input, bool &output) { + + std::transform(input.begin(), input.end(), input.begin(), ::tolower); + std::istringstream(input) >> std::boolalpha >> output; + return output; + } + + // Trim String utils + + /** + * Trims a string left to right + * @param s incoming string + * @returns modified string + */ + static std::string trim(std::string s) { + return trimRight(trimLeft(s)); + } + + /** + * Trims left most part of a string + * @param s incoming string + * @returns modified string + */ + static inline std::string trimLeft(std::string s) { + s.erase( + s.begin(), + std::find_if( + s.begin(), s.end(), + std::not1(std::pointer_to_unary_function<int, int>(std::isspace)))); + return s; + } + + /** + * Trims a string on the right + * @param s incoming string + * @returns modified string + */ + + static inline std::string trimRight(std::string s) { + s.erase( + std::find_if( + s.rbegin(), s.rend(), + std::not1(std::pointer_to_unary_function<int, int>(std::isspace))) + .base(), + s.end()); + return s; + } + + /** + * Converts a string to a float + * @param input input string + * @param output output float + * @param cp failure policy + */ + static bool StringToFloat(std::string input, float &output, FailurePolicy cp = + RETURN) { + try { + output = std::stof(input); + } catch (const std::invalid_argument &ie) { + switch (cp) { + case RETURN: + case NOTHING: + return false; + case EXIT: + exit(1); + case EXCEPT: + throw ie; + } + } catch (const std::out_of_range &ofr) { + switch (cp) { + case RETURN: + case NOTHING: + return false; + case EXIT: + exit(1); + case EXCEPT: + throw ofr; + + } + } + + return true; + + } }; @@ -134,5 +135,4 @@ public: } /* namespace apache */ } /* namespace org */ - #endif /* LIBMINIFI_INCLUDE_IO_STRINGUTILS_H_ */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/include/utils/TimeUtil.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h index 76b334b..6805419 100644 --- a/libminifi/include/utils/TimeUtil.h +++ b/libminifi/include/utils/TimeUtil.h @@ -33,8 +33,8 @@ * @returns milliseconds since epoch */ inline uint64_t getTimeMillis() { - return std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now().time_since_epoch()).count(); + return std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()).count(); } /** @@ -43,8 +43,8 @@ inline uint64_t getTimeMillis() { */ inline uint64_t getTimeNano() { - return std::chrono::duration_cast<std::chrono::nanoseconds>( - std::chrono::system_clock::now().time_since_epoch()).count(); + return std::chrono::duration_cast<std::chrono::nanoseconds>( + std::chrono::system_clock::now().time_since_epoch()).count(); } @@ -54,21 +54,19 @@ inline uint64_t getTimeNano() { * @param msec milliseconds since epoch * @returns string representing the time */ -inline std::string getTimeStr(uint64_t msec, bool enforce_locale = false) -{ - char date[120]; - time_t second = (time_t) (msec/1000); - msec = msec % 1000; - strftime(date, sizeof(date) / sizeof(*date), TIME_FORMAT, - ( enforce_locale==true ? gmtime(&second) : localtime(&second))); +inline std::string getTimeStr(uint64_t msec, bool enforce_locale = false) { + char date[120]; + time_t second = (time_t) (msec / 1000); + msec = msec % 1000; + strftime(date, sizeof(date) / sizeof(*date), TIME_FORMAT, + (enforce_locale == true ? gmtime(&second) : localtime(&second))); - std::string ret = date; - date[0] = '\0'; - sprintf(date, ".%03llu", (unsigned long long) msec); + std::string ret = date; + date[0] = '\0'; + sprintf(date, ".%03llu", (unsigned long long) msec); - ret += date; - return ret; + ret += date; + return ret; } - #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 96ed7c7..f70686d 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -16,8 +16,9 @@ * limitations under the License. */ #include "properties/Configure.h" +#include <string> #include "utils/StringUtils.h" -#include "core/core.h" +#include "core/Core.h" namespace org { namespace apache { @@ -25,147 +26,149 @@ namespace nifi { namespace minifi { Configure *Configure::configure_(NULL); -const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file"; -const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration"; +const char *Configure::nifi_flow_configuration_file = + "nifi.flow.configuration.file"; +const char *Configure::nifi_administrative_yield_duration = + "nifi.administrative.yield.duration"; const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration"; -const char *Configure::nifi_graceful_shutdown_seconds = "nifi.flowcontroller.graceful.shutdown.period"; +const char *Configure::nifi_graceful_shutdown_seconds = + "nifi.flowcontroller.graceful.shutdown.period"; const char *Configure::nifi_log_level = "nifi.log.level"; const char *Configure::nifi_server_name = "nifi.server.name"; -const char *Configure::nifi_configuration_class_name = "nifi.flow.configuration.class.name"; -const char *Configure::nifi_flow_repository_class_name = "nifi.flow.repository.class.name"; -const char *Configure::nifi_provenance_repository_class_name = "nifi.provenance.repository.class.name"; +const char *Configure::nifi_configuration_class_name = + "nifi.flow.configuration.class.name"; +const char *Configure::nifi_flow_repository_class_name = + "nifi.flow.repository.class.name"; +const char *Configure::nifi_provenance_repository_class_name = + "nifi.provenance.repository.class.name"; const char *Configure::nifi_server_port = "nifi.server.port"; -const char *Configure::nifi_server_report_interval= "nifi.server.report.interval"; -const char *Configure::nifi_provenance_repository_max_storage_size = "nifi.provenance.repository.max.storage.size"; -const char *Configure::nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time"; -const char *Configure::nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default"; -const char *Configure::nifi_flowfile_repository_max_storage_size = "nifi.flowfile.repository.max.storage.size"; -const char *Configure::nifi_flowfile_repository_max_storage_time = "nifi.flowfile.repository.max.storage.time"; -const char *Configure::nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default"; +const char *Configure::nifi_server_report_interval = + "nifi.server.report.interval"; +const char *Configure::nifi_provenance_repository_max_storage_size = + "nifi.provenance.repository.max.storage.size"; +const char *Configure::nifi_provenance_repository_max_storage_time = + "nifi.provenance.repository.max.storage.time"; +const char *Configure::nifi_provenance_repository_directory_default = + "nifi.provenance.repository.directory.default"; +const char *Configure::nifi_flowfile_repository_max_storage_size = + "nifi.flowfile.repository.max.storage.size"; +const char *Configure::nifi_flowfile_repository_max_storage_time = + "nifi.flowfile.repository.max.storage.time"; +const char *Configure::nifi_flowfile_repository_directory_default = + "nifi.flowfile.repository.directory.default"; const char *Configure::nifi_remote_input_secure = "nifi.remote.input.secure"; -const char *Configure::nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; -const char *Configure::nifi_security_client_certificate = "nifi.security.client.certificate"; -const char *Configure::nifi_security_client_private_key = "nifi.security.client.private.key"; -const char *Configure::nifi_security_client_pass_phrase = "nifi.security.client.pass.phrase"; -const char *Configure::nifi_security_client_ca_certificate = "nifi.security.client.ca.certificate"; +const char *Configure::nifi_security_need_ClientAuth = + "nifi.security.need.ClientAuth"; +const char *Configure::nifi_security_client_certificate = + "nifi.security.client.certificate"; +const char *Configure::nifi_security_client_private_key = + "nifi.security.client.private.key"; +const char *Configure::nifi_security_client_pass_phrase = + "nifi.security.client.pass.phrase"; +const char *Configure::nifi_security_client_ca_certificate = + "nifi.security.client.ca.certificate"; + +#define BUFFER_SIZE 512 // Get the config value -bool Configure::get(std::string key, std::string &value) -{ - std::lock_guard<std::mutex> lock(mutex_); - auto it = properties_.find(key); - - if (it != properties_.end()) - { - value = it->second; - return true; - } - else - { - return false; - } +bool Configure::get(std::string key, std::string &value) { + std::lock_guard<std::mutex> lock(mutex_); + auto it = properties_.find(key); + + if (it != properties_.end()) { + value = it->second; + return true; + } else { + return false; + } } - // Parse one line in configure file like key=value -void Configure::parseConfigureFileLine(char *buf) -{ - char *line = buf; - - while ((line[0] == ' ') || (line[0] =='\t')) - ++line; - - char first = line[0]; - if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) - { - return; - } - - char *equal = strchr(line, '='); - if (equal == NULL) - { - return; - } - - equal[0] = '\0'; - std::string key = line; - - equal++; - while ((equal[0] == ' ') || (equal[0] == '\t')) - ++equal; - - first = equal[0]; - if ((first == '\0') || (first == '\r') || (first== '\n')) - { - return; - } - - std::string value = equal; - key = org::apache::nifi::minifi::utils::StringUtils::trimRight(key); - value = org::apache::nifi::minifi::utils::StringUtils::trimRight(value); - set(key, value); +void Configure::parseConfigureFileLine(char *buf) { + char *line = buf; + + while ((line[0] == ' ') || (line[0] == '\t')) + ++line; + + char first = line[0]; + if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') + || (first == '=')) { + return; + } + + char *equal = strchr(line, '='); + if (equal == NULL) { + return; + } + + equal[0] = '\0'; + std::string key = line; + + equal++; + while ((equal[0] == ' ') || (equal[0] == '\t')) + ++equal; + + first = equal[0]; + if ((first == '\0') || (first == '\r') || (first == '\n')) { + return; + } + + std::string value = equal; + key = org::apache::nifi::minifi::utils::StringUtils::trimRight(key); + value = org::apache::nifi::minifi::utils::StringUtils::trimRight(value); + set(key, value); } // Load Configure File -void Configure::loadConfigureFile(const char *fileName) -{ - - std::string adjustedFilename; - if (fileName) - { - // perform a naive determination if this is a relative path - if (fileName[0] != '/') - { - adjustedFilename = adjustedFilename + configure_->getHome() + "/" + fileName; - } - else - { - adjustedFilename += fileName; - } - } - char *path = NULL; - char full_path[PATH_MAX]; - path = realpath(adjustedFilename.c_str(), full_path); - logger_->log_info("Using configuration file located at %s", path); - - std::ifstream file(path, std::ifstream::in); - if (!file.good()) - { - logger_->log_error("load configure file failed %s", path); - return; - } - this->clear(); - const unsigned int bufSize = 512; - char buf[bufSize]; - for (file.getline(buf, bufSize); file.good(); file.getline(buf, bufSize)) - { - parseConfigureFileLine(buf); +void Configure::loadConfigureFile(const char *fileName) { + std::string adjustedFilename; + if (fileName) { + // perform a naive determination if this is a relative path + if (fileName[0] != '/') { + adjustedFilename = adjustedFilename + configure_->getHome() + "/" + + fileName; + } else { + adjustedFilename += fileName; } + } + char *path = NULL; + char full_path[PATH_MAX]; + path = realpath(adjustedFilename.c_str(), full_path); + logger_->log_info("Using configuration file located at %s", path); + + std::ifstream file(path, std::ifstream::in); + if (!file.good()) { + logger_->log_error("load configure file failed %s", path); + return; + } + this->clear(); + + char buf[BUFFER_SIZE]; + for (file.getline(buf, BUFFER_SIZE); file.good(); + file.getline(buf, BUFFER_SIZE)) { + parseConfigureFileLine(buf); + } } // Parse Command Line -void Configure::parseCommandLine(int argc, char **argv) -{ - int i; - bool keyFound = false; - std::string key, value; - - for (i = 1; i < argc; i++) - { - if (argv[i][0] == '-' && argv[i][1] != '\0') - { - keyFound = true; - key = &argv[i][1]; - continue; - } - if (keyFound) - { - value = argv[i]; - set(key,value); - keyFound = false; - } - } - return; +void Configure::parseCommandLine(int argc, char **argv) { + int i; + bool keyFound = false; + std::string key, value; + + for (i = 1; i < argc; i++) { + if (argv[i][0] == '-' && argv[i][1] != '\0') { + keyFound = true; + key = &argv[i][1]; + continue; + } + if (keyFound) { + value = argv[i]; + set(key, value); + keyFound = false; + } + } + return; } } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp index 6f64ff3..ab300ba 100644 --- a/libminifi/src/Connection.cpp +++ b/libminifi/src/Connection.cpp @@ -17,16 +17,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "Connection.h" +#include <sys/time.h> +#include <time.h> #include <vector> #include <queue> +#include <memory> +#include <string> #include <map> #include <set> -#include <sys/time.h> -#include <time.h> #include <chrono> #include <thread> #include <iostream> - #include "core/FlowFile.h" #include "Connection.h" #include "core/Processor.h" @@ -95,7 +97,7 @@ void Connection::put(std::shared_ptr<core::FlowFile> flow) { if (!flow->isStored()) { // Save to the flowfile repo - FlowFileRecord event(flow_repository_,flow,this->uuidStr_); + FlowFileRecord event(flow_repository_, flow, this->uuidStr_); if (event.Serialize()) { flow->setStoredToRepository(true); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/EventDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/EventDrivenSchedulingAgent.cpp b/libminifi/src/EventDrivenSchedulingAgent.cpp index 0484139..cbb60ea 100644 --- a/libminifi/src/EventDrivenSchedulingAgent.cpp +++ b/libminifi/src/EventDrivenSchedulingAgent.cpp @@ -17,10 +17,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "EventDrivenSchedulingAgent.h" #include <chrono> +#include <memory> #include <thread> #include <iostream> -#include "EventDrivenSchedulingAgent.h" #include "core/Processor.h" #include "core/ProcessContext.h" #include "core/ProcessSessionFactory.h" @@ -31,7 +32,6 @@ namespace apache { namespace nifi { namespace minifi { - void EventDrivenSchedulingAgent::run( std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/FlowControlProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp index 50fc0e2..69f482f 100644 --- a/libminifi/src/FlowControlProtocol.cpp +++ b/libminifi/src/FlowControlProtocol.cpp @@ -17,17 +17,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "FlowControlProtocol.h" #include <sys/time.h> #include <stdio.h> #include <time.h> +#include <netinet/tcp.h> #include <chrono> #include <thread> +#include <string> #include <random> -#include <netinet/tcp.h> #include <iostream> #include "FlowController.h" -#include "FlowControlProtocol.h" -#include "core/core.h" +#include "core/Core.h" namespace org { namespace apache { namespace nifi { @@ -45,7 +46,7 @@ int FlowControlProtocol::connectServer(const char *host, uint16_t port) { int hh_errno; gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); #endif - memcpy((char *) &addr, h->h_addr_list[0], h->h_length); + memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length); sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { logger_->log_error("Could not create socket to hostName %s", host); @@ -56,30 +57,26 @@ int FlowControlProtocol::connectServer(const char *host, uint16_t port) { int opt = 1; bool nagle_off = true; - if (nagle_off) - { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) - { + if (nagle_off) { + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, reinterpret_cast<void*>(&opt), sizeof(opt)) < 0) { logger_->log_error("setsockopt() TCP_NODELAY failed"); close(sock); return 0; } if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&opt, sizeof(opt)) < 0) - { - logger_->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return 0; - } - } + reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { + logger_->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return 0; + } + } - int sndsize = 256*1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) - { - logger_->log_error("setsockopt() SO_SNDBUF failed"); - close(sock); - return 0; - } + int sndsize = 256*1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sndsize), sizeof(sndsize)) < 0) { + logger_->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + return 0; + } #endif struct sockaddr_in sa; @@ -123,7 +120,7 @@ int FlowControlProtocol::sendData(uint8_t *buf, int buflen) { while (bytes < buflen) { ret = send(_socket, buf + bytes, buflen - bytes, 0); - //check for errors + // check for errors if (ret == -1) { return ret; } @@ -232,8 +229,9 @@ void FlowControlProtocol::run(FlowControlProtocol *protocol) { if (!protocol->_registered) { // if it is not register yet protocol->sendRegisterReq(); - } else + } else { protocol->sendReportReq(); + } } return; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 433387a..1a163ea 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -17,23 +17,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include <vector> -#include <queue> -#include <map> -#include <set> +#include "FlowController.h" #include <sys/time.h> #include <time.h> -#include <chrono> -#include <thread> #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <chrono> #include <future> -#include "FlowController.h" +#include <thread> +#include <utility> +#include <memory> +#include <string> #include "core/ProcessContext.h" #include "core/ProcessGroup.h" #include "utils/StringUtils.h" -#include "core/core.h" +#include "core/Core.h" #include "core/repository/FlowFileRepository.h" namespace org { @@ -105,7 +108,6 @@ FlowController::FlowController( initializePaths(adjustedFilename); } - } void FlowController::initializePaths(const std::string &adjustedFilename) { @@ -125,12 +127,12 @@ void FlowController::initializePaths(const std::string &adjustedFilename) { // Create the content repo directory if needed struct stat contentDirStat; - if (stat(ResourceClaim::default_directory_path.c_str(), &contentDirStat) + if (stat(ResourceClaim::default_directory_path, &contentDirStat) != -1&& S_ISDIR(contentDirStat.st_mode)) { - path = realpath(ResourceClaim::default_directory_path.c_str(), full_path); + path = realpath(ResourceClaim::default_directory_path, full_path); logger_->log_info("FlowController content directory %s", full_path); } else { - if (mkdir(ResourceClaim::default_directory_path.c_str(), 0777) == -1) { + if (mkdir(ResourceClaim::default_directory_path, 0777) == -1) { logger_->log_error("FlowController content directory creation failed"); exit(1); } @@ -144,7 +146,6 @@ void FlowController::initializePaths(const std::string &adjustedFilename) { full_path); exit(1); } - } FlowController::~FlowController() { @@ -154,7 +155,6 @@ FlowController::~FlowController() { delete protocol_; flow_file_repo_ = nullptr; provenance_repo_ = nullptr; - } void FlowController::stop(bool force) { @@ -173,7 +173,6 @@ void FlowController::stop(bool force) { if (this->root_) this->root_->stopProcessing(&this->_timerScheduler, &this->_eventScheduler); - } } @@ -200,7 +199,6 @@ void FlowController::waitUnload(const uint64_t timeToWaitMs) { if (std::future_status::ready == unload_task.wait_until(wait_time)) { running_ = false; } - } } @@ -278,7 +276,6 @@ bool FlowController::start() { "Can not start Flow Controller because it has not been initialized"); return false; } else { - if (!running_) { logger_->log_info("Starting Flow Controller"); this->_timerScheduler.start(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index 562a685..de682b0 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -17,16 +17,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "FlowFileRecord.h" +#include <sys/time.h> +#include <time.h> +#include <cstdio> #include <vector> #include <queue> #include <map> -#include <sys/time.h> -#include <time.h> +#include <memory> +#include <string> #include <iostream> #include <fstream> -#include <cstdio> - -#include "FlowFileRecord.h" #include "core/logging/Logger.h" #include "core/Relationship.h" #include "core/Repository.h" @@ -73,18 +74,18 @@ FlowFileRecord::FlowFileRecord( : FlowFile(), snapshot_(""), flow_repository_(flow_repository) { - entry_date_ = event->getEntryDate(); - lineage_start_date_ = event->getlineageStartDate(); - lineage_Identifiers_ = event->getlineageIdentifiers(); - uuid_str_ = event->getUUIDStr(); - attributes_ = event->getAttributes(); - size_ = event->getSize(); - offset_ = event->getOffset(); - event->getUUID(uuid_); - uuid_connection_ = uuidConnection; - if (event->getResourceClaim()) { - content_full_fath_ = event->getResourceClaim()->getContentFullPath(); - } + entry_date_ = event->getEntryDate(); + lineage_start_date_ = event->getlineageStartDate(); + lineage_Identifiers_ = event->getlineageIdentifiers(); + uuid_str_ = event->getUUIDStr(); + attributes_ = event->getAttributes(); + size_ = event->getSize(); + offset_ = event->getOffset(); + event->getUUID(uuid_); + uuid_connection_ = uuidConnection; + if (event->getResourceClaim()) { + content_full_fath_ = event->getResourceClaim()->getContentFullPath(); + } } FlowFileRecord::FlowFileRecord( @@ -94,7 +95,6 @@ FlowFileRecord::FlowFileRecord( uuid_connection_(""), snapshot_(""), flow_repository_(flow_repository) { - } FlowFileRecord::~FlowFileRecord() { @@ -175,10 +175,10 @@ bool FlowFileRecord::DeSerialize(std::string key) { logger_->log_error("NiFi FlowFile Store event %s can not found", key.c_str()); return false; - } else + } else { logger_->log_debug("NiFi FlowFile Read event %s length %d", key.c_str(), value.length()); - + } io::DataStream stream((const uint8_t*) value.data(), value.length()); ret = DeSerialize(stream); @@ -197,14 +197,12 @@ bool FlowFileRecord::DeSerialize(std::string key) { } bool FlowFileRecord::Serialize() { - io::DataStream outStream; int ret; ret = write(this->event_time_, &outStream); if (ret != 8) { - return false; } @@ -215,57 +213,48 @@ bool FlowFileRecord::Serialize() { ret = write(this->lineage_start_date_, &outStream); if (ret != 8) { - return false; } ret = writeUTF(this->uuid_str_, &outStream); if (ret <= 0) { - return false; } ret = writeUTF(this->uuid_connection_, &outStream); if (ret <= 0) { - return false; } // write flow attributes uint32_t numAttributes = this->attributes_.size(); ret = write(numAttributes, &outStream); if (ret != 4) { - return false; } for (auto itAttribute : attributes_) { ret = writeUTF(itAttribute.first, &outStream, true); if (ret <= 0) { - return false; } ret = writeUTF(itAttribute.second, &outStream, true); if (ret <= 0) { - return false; } } ret = writeUTF(this->content_full_fath_, &outStream); if (ret <= 0) { - return false; } ret = write(this->size_, &outStream); if (ret != 8) { - return false; } ret = write(this->offset_, &outStream); if (ret != 8) { - return false; } @@ -281,13 +270,10 @@ bool FlowFileRecord::Serialize() { return false; } - // cleanup - return true; } bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) { - int ret; io::DataStream outStream(buffer, bufferSize); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 9790256..33f0cb2 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -17,18 +17,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "RemoteProcessorGroupPort.h" +#include <sys/time.h> +#include <string.h> +#include <time.h> #include <vector> #include <queue> #include <map> #include <set> -#include <sys/time.h> -#include <time.h> +#include <string> +#include <utility> +#include <memory> #include <sstream> -#include <string.h> #include <iostream> - -#include "RemoteProcessorGroupPort.h" - #include "../include/io/StreamFactory.h" #include "io/ClientSocket.h" #include "utils/TimeUtil.h" @@ -48,13 +49,13 @@ core::Property RemoteProcessorGroupPort::hostName("Host Name", core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999"); core::Relationship RemoteProcessorGroupPort::relation; - std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol() { std::lock_guard<std::mutex> protocol_lock_(protocol_mutex_); if (available_protocols_.empty()) return nullptr; - std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move(available_protocols_.top()); + std::unique_ptr<Site2SiteClientProtocol> return_pointer = std::move( + available_protocols_.top()); available_protocols_.pop(); return std::move(return_pointer); } @@ -66,7 +67,6 @@ void RemoteProcessorGroupPort::returnProtocol( } void RemoteProcessorGroupPort::initialize() { - // Set the supported properties std::set<core::Property> properties; properties.insert(hostName); @@ -76,7 +76,6 @@ void RemoteProcessorGroupPort::initialize() { std::set<core::Relationship> relationships; relationships.insert(relation); setSupportedRelationships(relationships); - } void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, @@ -90,7 +89,6 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, // Peer Connection if (protocol_ == nullptr) { - protocol_ = std::unique_ptr<Site2SiteClientProtocol>( new Site2SiteClientProtocol(0)); protocol_->setPortId(protocol_uuid_); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/ResourceClaim.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ResourceClaim.cpp b/libminifi/src/ResourceClaim.cpp index 826ca1d..cbe7712 100644 --- a/libminifi/src/ResourceClaim.cpp +++ b/libminifi/src/ResourceClaim.cpp @@ -20,6 +20,7 @@ #include <vector> #include <queue> #include <map> +#include <string> #include "ResourceClaim.h" @@ -30,7 +31,7 @@ namespace minifi { std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0); -std::string ResourceClaim::default_directory_path = DEFAULT_CONTENT_DIRECTORY; +char *ResourceClaim::default_directory_path = const_cast<char*>(DEFAULT_CONTENT_DIRECTORY); ResourceClaim::ResourceClaim(const std::string contentDirectory) : _id(_localResourceClaimNumber.load()), http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/SchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index 8cb88e0..d69ba00 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -17,11 +17,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "SchedulingAgent.h" #include <chrono> #include <thread> +#include <memory> #include <iostream> #include "Exception.h" -#include "SchedulingAgent.h" #include "core/Processor.h" namespace org { @@ -29,8 +30,7 @@ namespace apache { namespace nifi { namespace minifi { -bool SchedulingAgent::hasWorkToDo( - std::shared_ptr<core::Processor> processor) { +bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) { // Whether it has work to do if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() || processor->flowFilesQueued()) @@ -44,10 +44,9 @@ bool SchedulingAgent::hasTooMuchOutGoing( return processor->flowFilesOutGoingFull(); } -bool SchedulingAgent::onTrigger( - std::shared_ptr<core::Processor> processor, - core::ProcessContext *processContext, - core::ProcessSessionFactory *sessionFactory) { +bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, + core::ProcessContext *processContext, + core::ProcessSessionFactory *sessionFactory) { if (processor->isYield()) return false; @@ -62,8 +61,6 @@ bool SchedulingAgent::onTrigger( // need to apply backpressure return true; - //TODO runDuration - processor->incrementActiveTasks(); try { processor->onTrigger(processContext, sessionFactory); @@ -85,7 +82,6 @@ bool SchedulingAgent::onTrigger( return false; } - } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp index e0265bb..52a0a02 100644 --- a/libminifi/src/Site2SiteClientProtocol.cpp +++ b/libminifi/src/Site2SiteClientProtocol.cpp @@ -21,9 +21,11 @@ #include <stdio.h> #include <time.h> #include <chrono> +#include <map> +#include <string> +#include <memory> #include <thread> #include <random> -#include <netinet/tcp.h> #include <iostream> #include "io/CRCStream.h" #include "Site2SitePeer.h" @@ -356,7 +358,7 @@ int Site2SiteClientProtocol::readRequestType(RequestType &type) { if (ret <= 0) return ret; - for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++) { + for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) { if (RequestTypeStr[i] == requestTypeStr) { type = (RequestType) i; return ret; @@ -426,12 +428,14 @@ int Site2SiteClientProtocol::writeRespond(RespondCode code, if (resCode->hasDescription) { ret = peer_->writeUTF(message); - if (ret > 0) + if (ret > 0) { return (3 + ret); - else + } else { return ret; - } else + } + } else { return 3; + } } bool Site2SiteClientProtocol::negotiateCodec() { @@ -518,7 +522,8 @@ Transaction* Site2SiteClientProtocol::createTransaction( return NULL; } - org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get()); + org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream( + peer_.get()); switch (code) { case MORE_DATA: dataAvailable = true; @@ -553,7 +558,8 @@ Transaction* Site2SiteClientProtocol::createTransaction( // tearDown(); return NULL; } else { - org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get()); + org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream( + peer_.get()); transaction = new Transaction(direction, crcstream); _transactionMap[transaction->getUUIDStr()] = transaction; transactionID = transaction->getUUIDStr(); @@ -680,9 +686,10 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, return true; } -bool Site2SiteClientProtocol::send( - std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, - core::ProcessSession *session) { +bool Site2SiteClientProtocol::send(std::string transactionID, + DataPacket *packet, + std::shared_ptr<FlowFileRecord> flowFile, + core::ProcessSession *session) { int ret; Transaction *transaction = NULL; @@ -772,9 +779,8 @@ bool Site2SiteClientProtocol::send( return true; } -void Site2SiteClientProtocol::receiveFlowFiles( - core::ProcessContext *context, - core::ProcessSession *session) { +void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, + core::ProcessSession *session) { uint64_t bytes = 0; int transfers = 0; Transaction *transaction = NULL; @@ -817,7 +823,9 @@ void Site2SiteClientProtocol::receiveFlowFiles( // transaction done break; } - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());; + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< + FlowFileRecord>(session->create()); + if (!flowFile) { throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed"); return; @@ -930,7 +938,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the // Critical Section involved in this transaction so that rather than the Critical Section being the // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. - long crcValue = transaction->getCRC(); + int64_t crcValue = transaction->getCRC(); std::string crc = std::to_string(crcValue); logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), transactionID.c_str()); @@ -978,7 +986,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { "Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str()); if (this->_currentVersion > 3) { - long crcValue = transaction->getCRC(); + int64_t crcValue = transaction->getCRC(); std::string crc = std::to_string(crcValue); if (message == crc) { logger_->log_info("Site2Site transaction %s CRC matched", @@ -1113,9 +1121,9 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) { logger_->log_info("Site2Site transaction %s send finished", transactionID.c_str()); ret = this->writeRespond(TRANSACTION_FINISHED, "Finished"); - if (ret <= 0) + if (ret <= 0) { return false; - else { + } else { transaction->_state = TRANSACTION_COMPLETED; return true; } @@ -1143,10 +1151,11 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) { } } -void Site2SiteClientProtocol::transferFlowFiles( - core::ProcessContext *context, - core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast<FlowFileRecord>(session->get());; +void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, + core::ProcessSession *session) { + std::shared_ptr<FlowFileRecord> flow = + std::static_pointer_cast<FlowFileRecord>(session->get()); + Transaction *transaction = NULL; if (!flow) @@ -1201,7 +1210,8 @@ void Site2SiteClientProtocol::transferFlowFiles( if (transferNanos > _batchSendNanos) break; - flow = std::static_pointer_cast<FlowFileRecord>(session->get());; + flow = std::static_pointer_cast<FlowFileRecord>(session->get()); + if (!flow) { continueTransaction = false; } @@ -1240,7 +1250,6 @@ void Site2SiteClientProtocol::transferFlowFiles( return; } - } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/Site2SitePeer.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp index 64732ac..551d466 100644 --- a/libminifi/src/Site2SitePeer.cpp +++ b/libminifi/src/Site2SitePeer.cpp @@ -17,6 +17,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "Site2SitePeer.h" #include <sys/time.h> #include <stdio.h> #include <time.h> @@ -24,43 +25,39 @@ #include <thread> #include <random> #include <memory> -#include <netinet/tcp.h> #include <iostream> #include "io/ClientSocket.h" #include "io/validation.h" -#include "Site2SitePeer.h" #include "FlowController.h" - namespace org { namespace apache { namespace nifi { namespace minifi { bool Site2SitePeer::Open() { + if (IsNullOrEmpty(host_)) + return false; - if (IsNullOrEmpty (host_)) - return false; + if (stream_->initialize() < 0) + return false; - if (stream_->initialize() < 0) - return false; + uint16_t data_size = sizeof MAGIC_BYTES; - uint16_t data_size = sizeof MAGIC_BYTES; + if (stream_->writeData( + reinterpret_cast<uint8_t *>(const_cast<char*>(MAGIC_BYTES)), data_size) + != data_size) { + return false; + } - if (stream_->writeData((uint8_t *) MAGIC_BYTES, data_size) != data_size) { - return false; - } - - return true; + return true; } void Site2SitePeer::Close() { - if (stream_ != nullptr) - stream_->closeStream(); + if (stream_ != nullptr) + stream_->closeStream(); } - - } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/ThreadedSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index 6c04281..65e7531 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -17,11 +17,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "ThreadedSchedulingAgent.h" +#include <memory> +#include <string> +#include <vector> +#include <map> #include <thread> #include <iostream> - -#include "ThreadedSchedulingAgent.h" - #include "core/Connectable.h" #include "core/ProcessorNode.h" #include "core/ProcessContext.h" @@ -35,7 +37,7 @@ namespace minifi { void ThreadedSchedulingAgent::schedule( std::shared_ptr<core::Processor> processor) { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); _administrativeYieldDuration = 0; std::string yieldValue; @@ -43,10 +45,11 @@ void ThreadedSchedulingAgent::schedule( if (configure_->get(Configure::nifi_administrative_yield_duration, yieldValue)) { core::TimeUnit unit; - if (core::Property::StringToTime( - yieldValue, _administrativeYieldDuration, unit) - && core::Property::ConvertTimeUnitToMS( - _administrativeYieldDuration, unit, _administrativeYieldDuration)) { + if (core::Property::StringToTime(yieldValue, _administrativeYieldDuration, + unit) + && core::Property::ConvertTimeUnitToMS(_administrativeYieldDuration, + unit, + _administrativeYieldDuration)) { logger_->log_debug("nifi_administrative_yield_duration: [%d] ms", _administrativeYieldDuration); } @@ -55,10 +58,9 @@ void ThreadedSchedulingAgent::schedule( _boredYieldDuration = 0; if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue)) { core::TimeUnit unit; - if (core::Property::StringToTime( - yieldValue, _boredYieldDuration, unit) - && core::Property::ConvertTimeUnitToMS( - _boredYieldDuration, unit, _boredYieldDuration)) { + if (core::Property::StringToTime(yieldValue, _boredYieldDuration, unit) + && core::Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, + _boredYieldDuration)) { logger_->log_debug("nifi_bored_yield_duration: [%d] ms", _boredYieldDuration); } @@ -80,11 +82,10 @@ void ThreadedSchedulingAgent::schedule( } core::ProcessorNode processor_node(processor); - auto processContext = std::make_shared - < core::ProcessContext > (processor_node,repo_); - auto sessionFactory = std::make_shared - < core::ProcessSessionFactory - > (processContext.get()); + auto processContext = std::make_shared<core::ProcessContext>(processor_node, + repo_); + auto sessionFactory = std::make_shared<core::ProcessSessionFactory>( + processContext.get()); processor->onSchedule(processContext.get(), sessionFactory.get()); @@ -105,9 +106,9 @@ void ThreadedSchedulingAgent::schedule( return; } -void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) { - std::lock_guard < std::mutex > lock(mutex_); - +void ThreadedSchedulingAgent::unschedule( + std::shared_ptr<core::Processor> processor) { + std::lock_guard<std::mutex> lock(mutex_); logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str()); @@ -139,7 +140,6 @@ void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> proces processor->clearActiveTask(); } - } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/TimerDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index 3895e81..8d10658 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -17,10 +17,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "TimerDrivenSchedulingAgent.h" #include <chrono> #include <thread> +#include <memory> #include <iostream> -#include "TimerDrivenSchedulingAgent.h" #include "core/Property.h" namespace org { @@ -35,7 +36,6 @@ void TimerDrivenSchedulingAgent::run( while (this->running_) { bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); - if (processor->isYield()) { // Honor the yield std::this_thread::sleep_for( http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ConfigurableComponent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index 67a43dd..fa5ff7d 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -17,7 +17,10 @@ */ #include "core/ConfigurableComponent.h" - +#include <memory> +#include <utility> +#include <string> +#include <set> #include "core/Property.h" #include "core/logging/Logger.h" @@ -27,19 +30,17 @@ namespace nifi { namespace minifi { namespace core { -ConfigurableComponent::ConfigurableComponent(std::shared_ptr<logging::Logger> logger) +ConfigurableComponent::ConfigurableComponent( + std::shared_ptr<logging::Logger> logger) : my_logger_(logger) { - } ConfigurableComponent::ConfigurableComponent( const ConfigurableComponent &&other) : properties_(std::move(other.properties_)), my_logger_(std::move(other.my_logger_)) { - } ConfigurableComponent::~ConfigurableComponent() { - } /** @@ -58,7 +59,7 @@ bool ConfigurableComponent::getProperty(const std::string name, Property item = it->second; value = item.getValue(); my_logger_->log_info("Processor %s property name %s value %s", name.c_str(), - item.getName().c_str(), value.c_str()); + item.getName().c_str(), value.c_str()); return true; } else { return false; @@ -80,7 +81,7 @@ bool ConfigurableComponent::setProperty(const std::string name, item.setValue(value); properties_[item.getName()] = item; my_logger_->log_info("Component %s property name %s value %s", name.c_str(), - item.getName().c_str(), value.c_str()); + item.getName().c_str(), value.c_str()); return true; } else { return false; @@ -102,7 +103,7 @@ bool ConfigurableComponent::setProperty(Property &prop, std::string value) { item.setValue(value); properties_[item.getName()] = item; my_logger_->log_info("property name %s value %s", prop.getName().c_str(), - item.getName().c_str(), value.c_str()); + item.getName().c_str(), value.c_str()); return true; } else { Property newProp(prop); @@ -110,7 +111,6 @@ bool ConfigurableComponent::setProperty(Property &prop, std::string value) { properties_.insert( std::pair<std::string, Property>(prop.getName(), newProp)); return true; - } return false; } @@ -132,11 +132,10 @@ bool ConfigurableComponent::setSupportedProperties( for (auto item : properties) { properties_[item.getName()] = item; } - return true; } -} /* namespace components */ +} /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ConfigurationFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp index 52bde69..e009b1b 100644 --- a/libminifi/src/core/ConfigurationFactory.cpp +++ b/libminifi/src/core/ConfigurationFactory.cpp @@ -15,10 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +#include <type_traits> +#include <utility> +#include <string> +#include <memory> +#include <algorithm> +#include <set> #include "core/ConfigurationFactory.h" #include "core/FlowConfiguration.h" -#include <type_traits> + #ifdef YAML_SUPPORT #include "core/yaml/YamlConfiguration.h" #endif @@ -29,50 +34,49 @@ namespace nifi { namespace minifi { namespace core { #ifndef YAML_SUPPORT - class YamlConfiguration; +class YamlConfiguration; #endif - std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( - std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_file_repo, - const std::string configuration_class_name, const std::string path, - bool fail_safe) { +std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( + std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo, + const std::string configuration_class_name, const std::string path, + bool fail_safe) { - std::string class_name_lc = configuration_class_name; - std::transform(class_name_lc.begin(), class_name_lc.end(), - class_name_lc.begin(), ::tolower); - try { + std::string class_name_lc = configuration_class_name; + std::transform(class_name_lc.begin(), class_name_lc.end(), + class_name_lc.begin(), ::tolower); + try { + if (class_name_lc == "flowconfiguration") { + // load the base configuration. + return std::unique_ptr<core::FlowConfiguration>( + new core::FlowConfiguration(repo, flow_file_repo, path)); - if (class_name_lc == "flowconfiguration") { - // load the base configuration. - return std::unique_ptr<core::FlowConfiguration>( - new core::FlowConfiguration(repo, flow_file_repo, path)); - - } else if (class_name_lc == "yamlconfiguration") { - // only load if the class is defined. - return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo, flow_file_repo, path)); - + } else if (class_name_lc == "yamlconfiguration") { + // only load if the class is defined. + return std::unique_ptr<core::FlowConfiguration>( + instantiate<core::YamlConfiguration>(repo, flow_file_repo, path)); - } else { - if (fail_safe) { - return std::unique_ptr<core::FlowConfiguration>( - new core::FlowConfiguration(repo, flow_file_repo, path)); - } else { - throw std::runtime_error( - "Support for the provided configuration class could not be found"); - } - } - } catch (const std::runtime_error &r) { + } else { if (fail_safe) { return std::unique_ptr<core::FlowConfiguration>( new core::FlowConfiguration(repo, flow_file_repo, path)); + } else { + throw std::runtime_error( + "Support for the provided configuration class could not be found"); } } - - throw std::runtime_error( - "Support for the provided configuration class could not be found"); + } catch (const std::runtime_error &r) { + if (fail_safe) { + return std::unique_ptr<core::FlowConfiguration>( + new core::FlowConfiguration(repo, flow_file_repo, path)); + } } + throw std::runtime_error( + "Support for the provided configuration class could not be found"); +} + } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Connectable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp index ac61568..e234a06 100644 --- a/libminifi/src/core/Connectable.cpp +++ b/libminifi/src/core/Connectable.cpp @@ -1,13 +1,26 @@ -/* - * Connectable.cpp +/** * - * Created on: Feb 27, 2017 - * Author: mparisi + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - -#include "../../include/core/Connectable.h" - +#include "core/Connectable.h" #include <uuid/uuid.h> +#include <utility> +#include <memory> +#include <string> +#include <set> #include "core/logging/Logger.h" #include "core/Relationship.h" @@ -20,7 +33,6 @@ namespace core { Connectable::Connectable(std::string name, uuid_t uuid) : CoreComponent(name, uuid), max_concurrent_tasks_(1) { - } Connectable::Connectable(const Connectable &&other) @@ -31,7 +43,6 @@ Connectable::Connectable(const Connectable &&other) } Connectable::~Connectable() { - } bool Connectable::setSupportedRelationships( @@ -51,7 +62,6 @@ bool Connectable::setSupportedRelationships( logger_->log_info("Processor %s supported relationship name %s", name_.c_str(), item.getName().c_str()); } - return true; } @@ -89,7 +99,6 @@ bool Connectable::setAutoTerminatedRelationships( logger_->log_info("Processor %s auto terminated relationship name %s", name_.c_str(), item.getName().c_str()); } - return true; } @@ -118,7 +127,6 @@ void Connectable::waitForWork(uint64_t timeoutMs) { work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] {return has_work_.load();}); } - } void Connectable::notifyWork() { @@ -134,7 +142,6 @@ void Connectable::notifyWork() { work_condition_.notify_one(); } } - } std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections( @@ -167,7 +174,7 @@ std::shared_ptr<Connectable> Connectable::getNextIncomingConnection() { return ret; } -} /* namespace components */ +} /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Core.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp index 39969f6..cc6445d 100644 --- a/libminifi/src/core/Core.cpp +++ b/libminifi/src/core/Core.cpp @@ -1,12 +1,23 @@ -/* - * Core.cpp +/** * - * Created on: Mar 10, 2017 - * Author: mparisi + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -#include "core/core.h" - +#include "core/Core.h" +#include <string> namespace org { namespace apache { namespace nifi { @@ -38,14 +49,13 @@ unsigned const char *CoreComponent::getUUID() { // Set Processor Name void CoreComponent::setName(const std::string name) { name_ = name; - } // Get Process Name std::string CoreComponent::getName() { return name_; } -} -} -} -} -} +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index f2dda0d..68aaf5c 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -17,6 +17,8 @@ */ #include "core/FlowConfiguration.h" +#include <memory> +#include <string> namespace org { namespace apache { @@ -25,7 +27,6 @@ namespace minifi { namespace core { FlowConfiguration::~FlowConfiguration() { - } std::shared_ptr<core::Processor> FlowConfiguration::createProcessor( @@ -40,11 +41,6 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor( processor = std::make_shared< org::apache::nifi::minifi::processors::LogAttribute>(name, uuid); } else if (name - == org::apache::nifi::minifi::processors::RealTimeDataCollector::ProcessorName) { - processor = std::make_shared< - org::apache::nifi::minifi::processors::RealTimeDataCollector>(name, - uuid); - } else if (name == org::apache::nifi::minifi::processors::GetFile::ProcessorName) { processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>(name, http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/FlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp new file mode 100644 index 0000000..b3ab741 --- /dev/null +++ b/libminifi/src/core/FlowFile.cpp @@ -0,0 +1,223 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/FlowFile.h" +#include <memory> +#include <string> +#include <set> +#include "core/logging/Logger.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace core { + +FlowFile::FlowFile() + : size_(0), + id_(0), + stored(false), + offset_(0), + last_queue_date_(0), + penaltyExpiration_ms_(0), + event_time_(0), + claim_(nullptr), + marked_delete_(false), + connection_(nullptr), + original_connection_() { + entry_date_ = getTimeMillis(); + lineage_start_date_ = entry_date_; + + char uuidStr[37]; + + // Generate the global UUID for the flow record + uuid_generate(uuid_); + + uuid_unparse_lower(uuid_, uuidStr); + uuid_str_ = uuidStr; + + logger_ = logging::Logger::getLogger(); +} + +FlowFile::~FlowFile() { +} + +FlowFile& FlowFile::operator=(const FlowFile& other) { + uuid_copy(uuid_, other.uuid_); + stored = other.stored; + marked_delete_ = other.marked_delete_; + entry_date_ = other.entry_date_; + lineage_start_date_ = other.lineage_start_date_; + lineage_Identifiers_ = other.lineage_Identifiers_; + last_queue_date_ = other.last_queue_date_; + size_ = other.size_; + penaltyExpiration_ms_ = other.penaltyExpiration_ms_; + attributes_ = other.attributes_; + claim_ = other.claim_; + if (claim_ != nullptr) + this->claim_->increaseFlowFileRecordOwnedCount(); + uuid_str_ = other.uuid_str_; + connection_ = other.connection_; + original_connection_ = other.original_connection_; + return *this; +} + +/** + * Returns whether or not this flow file record + * is marked as deleted. + * @return marked deleted + */ +bool FlowFile::isDeleted() { + return marked_delete_; +} + +/** + * Sets whether to mark this flow file record + * as deleted + * @param deleted deleted flag + */ +void FlowFile::setDeleted(const bool deleted) { + marked_delete_ = deleted; +} + +std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() { + return claim_; +} + +void FlowFile::clearResourceClaim() { + claim_ = nullptr; +} +void FlowFile::setResourceClaim(std::shared_ptr<ResourceClaim> &claim) { + claim_ = claim; +} + +// ! Get Entry Date +uint64_t FlowFile::getEntryDate() { + return entry_date_; +} +uint64_t FlowFile::getEventTime() { + return event_time_; +} +// ! Get Lineage Start Date +uint64_t FlowFile::getlineageStartDate() { + return lineage_start_date_; +} + +std::set<std::string> &FlowFile::getlineageIdentifiers() { + return lineage_Identifiers_; +} + +bool FlowFile::getAttribute(std::string key, std::string &value) { + auto it = attributes_.find(key); + if (it != attributes_.end()) { + value = it->second; + return true; + } else { + return false; + } +} + +// Get Size +uint64_t FlowFile::getSize() { + return size_; +} +// ! Get Offset +uint64_t FlowFile::getOffset() { + return offset_; +} + +bool FlowFile::removeAttribute(const std::string key) { + auto it = attributes_.find(key); + if (it != attributes_.end()) { + attributes_.erase(key); + return true; + } else { + return false; + } +} + +bool FlowFile::updateAttribute(const std::string key, const std::string value) { + auto it = attributes_.find(key); + if (it != attributes_.end()) { + attributes_[key] = value; + return true; + } else { + return false; + } +} + +bool FlowFile::addAttribute(const std::string &key, const std::string &value) { + auto it = attributes_.find(key); + if (it != attributes_.end()) { + // attribute already there in the map + return false; + } else { + attributes_[key] = value; + return true; + } +} + +void FlowFile::setLineageStartDate(const uint64_t date) { + lineage_start_date_ = date; +} + +/** + * Sets the original connection with a shared pointer. + * @param connection shared connection. + */ +void FlowFile::setOriginalConnection( + std::shared_ptr<core::Connectable> &connection) { + original_connection_ = connection; +} + +/** + * Sets the connection with a shared pointer. + * @param connection shared connection. + */ +void FlowFile::setConnection(std::shared_ptr<core::Connectable> &connection) { + connection_ = connection; +} + +/** + * Sets the connection with a shared pointer. + * @param connection shared connection. + */ +void FlowFile::setConnection(std::shared_ptr<core::Connectable> &&connection) { + connection_ = connection; +} + +/** + * Returns the connection referenced by this record. + * @return shared connection pointer. + */ +std::shared_ptr<core::Connectable> FlowFile::getConnection() { + return connection_; +} + +/** + * Returns the original connection referenced by this record. + * @return shared original connection pointer. + */ +std::shared_ptr<core::Connectable> FlowFile::getOriginalConnection() { + return original_connection_; +} + +} /* namespace core */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index baa3ebd..1a6e729 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -17,16 +17,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "core/ProcessGroup.h" +#include <sys/time.h> +#include <time.h> #include <vector> +#include <memory> +#include <string> #include <queue> #include <map> #include <set> -#include <sys/time.h> -#include <time.h> #include <chrono> #include <thread> - -#include "core/ProcessGroup.h" #include "core/Processor.h" namespace org { @@ -63,7 +64,6 @@ ProcessGroup::~ProcessGroup() { ProcessGroup *processGroup(*it); delete processGroup; } - } bool ProcessGroup::isRootProcessGroup() { @@ -176,16 +176,12 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, } std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) { - std::shared_ptr<Processor> ret = NULL; - // std::lock_guard<std::mutex> lock(mutex_); - for (auto processor : processors_) { logger_->log_info("find processor %s", processor->getName().c_str()); uuid_t processorUUID; if (processor->getUUID(processorUUID)) { - char uuid_str[37]; // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0" uuid_unparse_lower(processorUUID, uuid_str); std::string processorUUIDstr = uuid_str; @@ -195,37 +191,31 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) { return processor; } } - } for (auto processGroup : child_process_groups_) { - logger_->log_info("find processor child %s", processGroup->getName().c_str()); std::shared_ptr<Processor> processor = processGroup->findProcessor(uuid); if (processor) return processor; } - return ret; } std::shared_ptr<Processor> ProcessGroup::findProcessor( const std::string &processorName) { std::shared_ptr<Processor> ret = NULL; - for (auto processor : processors_) { logger_->log_debug("Current processor is %s", processor->getName().c_str()); if (processor->getName() == processorName) return processor; } - for (auto processGroup : child_process_groups_) { std::shared_ptr<Processor> processor = processGroup->findProcessor( processorName); if (processor) return processor; } - return ret; } @@ -233,18 +223,15 @@ void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { std::lock_guard<std::mutex> lock(mutex_); - for (auto processor : processors_) { if (processor->getName() == processorName) { processor->setProperty(propertyName, propertyValue); } } - for (auto processGroup : child_process_groups_) { processGroup->updatePropertyValue(processorName, propertyName, propertyValue); } - return; } @@ -253,7 +240,6 @@ void ProcessGroup::getConnections( for (auto connection : connections_) { connectionMap[connection->getUUIDStr()] = connection; } - for (auto processGroup : child_process_groups_) { processGroup->getConnections(connectionMap); } @@ -305,7 +291,7 @@ void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) { } } -} /* namespace processor */ +} /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 09c3fa3..70de3f6 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -17,18 +17,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "core/ProcessSession.h" +#include <sys/time.h> +#include <time.h> #include <vector> #include <queue> #include <map> +#include <memory> +#include <string> #include <set> -#include <sys/time.h> -#include <time.h> #include <chrono> #include <thread> #include <iostream> -#include "core/ProcessSession.h" - namespace org { namespace apache { namespace nifi { @@ -37,8 +38,8 @@ namespace core { std::shared_ptr<core::FlowFile> ProcessSession::create() { std::map<std::string, std::string> empty; - std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), - empty); + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>( + process_context_->getProvenanceRepository(), empty); _addedFlowFiles[record->getUUIDStr()] = record; logger_->log_debug("Create FlowFile with UUID %s", @@ -50,10 +51,11 @@ std::shared_ptr<core::FlowFile> ProcessSession::create() { return record; } -std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::FlowFile> &&parent) { +std::shared_ptr<core::FlowFile> ProcessSession::create( + std::shared_ptr<core::FlowFile> &&parent) { std::map<std::string, std::string> empty; - std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), - empty); + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>( + process_context_->getProvenanceRepository(), empty); if (record) { _addedFlowFiles[record->getUUIDStr()] = record; @@ -77,7 +79,6 @@ std::shared_ptr<core::FlowFile> ProcessSession::create(std::shared_ptr<core::Flo record->setLineageStartDate(parent->getlineageStartDate()); record->setLineageIdentifiers(parent->getlineageIdentifiers()); parent->getlineageIdentifiers().insert(parent->getUUIDStr()); - } return record; } @@ -93,7 +94,6 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone( record->setOffset(parent->getOffset()); record->setSize(parent->getSize()); record->getResourceClaim()->increaseFlowFileRecordOwnedCount(); - ; } provenance_report_->clone(parent, record); } @@ -103,8 +103,8 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone( std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer( std::shared_ptr<core::FlowFile> &parent) { std::map<std::string, std::string> empty; - std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(), - empty); + std::shared_ptr<core::FlowFile> record = std::make_shared<FlowFileRecord>( + process_context_->getProvenanceRepository(), empty); if (record) { this->_clonedFlowFiles[record->getUUIDStr()] = record; @@ -134,7 +134,6 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer( record->setOffset(parent->getOffset()); record->setSize(parent->getSize()); record->getResourceClaim()->increaseFlowFileRecordOwnedCount(); - ; } provenance_report_->clone(parent, record); } @@ -143,12 +142,11 @@ std::shared_ptr<core::FlowFile> ProcessSession::cloneDuringTransfer( } std::shared_ptr<core::FlowFile> ProcessSession::clone( - std::shared_ptr<core::FlowFile> &parent, long offset, long size) { + std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size) { std::shared_ptr<core::FlowFile> record = this->create(parent); if (record) { - if (parent->getResourceClaim()) { - if ((offset + size) > (long) parent->getSize()) { + if ((offset + size) > parent->getSize()) { // Set offset and size logger_->log_error("clone offset %d and size %d exceed parent size %d", offset, size, parent->getSize()); @@ -165,7 +163,6 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone( std::shared_ptr<ResourceClaim> parent_claim = parent->getResourceClaim(); record->setResourceClaim(parent_claim); if (parent_claim != nullptr) { - record->getResourceClaim()->increaseFlowFileRecordOwnedCount(); } } @@ -531,13 +528,13 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, } /** - * Imports a file from the data stream - * @param stream incoming data stream that contains the data to store into a file - * @param flow flow file - * - */ + * Imports a file from the data stream + * @param stream incoming data stream that contains the data to store into a file + * @param flow flow file + * + */ void ProcessSession::importFrom(io::DataStream &stream, - std::shared_ptr<core::FlowFile> &&flow) { + std::shared_ptr<core::FlowFile> &&flow) { std::shared_ptr<ResourceClaim> claim = std::make_shared<ResourceClaim>(); int max_read = getpagesize(); @@ -550,27 +547,21 @@ void ProcessSession::importFrom(io::DataStream &stream, fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); - - if (fs.is_open() ) { - + if (fs.is_open()) { size_t position = 0; const size_t max_size = stream.getSize(); size_t read_size = max_read; - while(position < max_size) - { - if ((max_size - position) > max_read) - { + while (position < max_size) { + if ((max_size - position) > max_read) { read_size = max_read; - } - else - { + } else { read_size = max_size - position; } charBuffer.clear(); - stream.readData(charBuffer,read_size); + stream.readData(charBuffer, read_size); - fs.write((const char*)charBuffer.data(),read_size); - position+=read_size; + fs.write((const char*) charBuffer.data(), read_size); + position += read_size; } // Open the source file and stream to the flow file @@ -603,7 +594,6 @@ void ProcessSession::importFrom(io::DataStream &stream, } else { throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); } - } catch (std::exception &exception) { if (flow && flow->getResourceClaim() == claim) { flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); @@ -730,7 +720,6 @@ void ProcessSession::import(std::string source, fs.write(buf, input.gcount()); } - if (fs.good() && fs.tellp() >= 0) { flow->setSize(fs.tellp()); flow->setOffset(0); @@ -786,7 +775,6 @@ void ProcessSession::import(std::string source, } void ProcessSession::commit() { - try { // First we clone the flow record based on the transfered relationship for updated flow record for (auto && it : _updatedFlowFiles) { @@ -962,7 +950,6 @@ void ProcessSession::rollback() { flowf->setSnapShot(false); connection->put(record); } - } _originalFlowFiles.clear(); @@ -1010,7 +997,8 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() { _updatedFlowFiles[ret->getUUIDStr()] = ret; std::map<std::string, std::string> empty; std::shared_ptr<core::FlowFile> snapshot = - std::make_shared<FlowFileRecord>(process_context_->getProvenanceRepository(),empty); + std::make_shared<FlowFileRecord>( + process_context_->getProvenanceRepository(), empty); logger_->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str()); snapshot = ret; @@ -1026,7 +1014,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() { return NULL; } -} /* namespace processor */ +} /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ProcessSessionFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSessionFactory.cpp b/libminifi/src/core/ProcessSessionFactory.cpp index 445ca58..31b7481 100644 --- a/libminifi/src/core/ProcessSessionFactory.cpp +++ b/libminifi/src/core/ProcessSessionFactory.cpp @@ -19,23 +19,19 @@ */ #include "core/ProcessSessionFactory.h" - #include <memory> - namespace org { namespace apache { namespace nifi { namespace minifi { namespace core { -std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession() -{ - return std::unique_ptr<ProcessSession>(new ProcessSession(process_context_)); +std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession() { + return std::unique_ptr<ProcessSession>(new ProcessSession(process_context_)); } - -} /* namespace processor */ +} /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index ba52c28..9a0898a 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -17,19 +17,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "core/Processor.h" +#include <sys/time.h> +#include <time.h> #include <vector> #include <queue> #include <map> #include <set> -#include <sys/time.h> -#include <time.h> #include <chrono> +#include <string> #include <thread> #include <memory> #include <functional> - -#include "core/Processor.h" - #include "Connection.h" #include "core/Connectable.h" #include "core/ProcessContext.h" @@ -45,7 +44,6 @@ namespace core { Processor::Processor(std::string name, uuid_t uuid) : Connectable(name, uuid), ConfigurableComponent(logging::Logger::getLogger()) { - has_work_.store(false); // Setup the default values state_ = DISABLED; @@ -74,7 +72,6 @@ void Processor::setScheduledState(ScheduledState state) { } bool Processor::addConnection(std::shared_ptr<Connectable> conn) { - bool ret = false; if (isRunning()) { @@ -82,7 +79,8 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { name_.c_str()); return false; } - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>( + conn); std::lock_guard<std::mutex> lock(mutex_); uuid_t srcUUID; @@ -128,7 +126,6 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { ret = true; } } else { - // We do not have any outgoing connection for this relationship yet std::set<std::shared_ptr<Connectable>> newConnection; newConnection.insert(connection); @@ -156,8 +153,9 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { uuid_t srcUUID; uuid_t destUUID; - - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>( + conn); connection->getSourceUUID(srcUUID); connection->getDestinationUUID(destUUID); @@ -193,8 +191,6 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { } } - - bool Processor::flowFilesQueued() { std::lock_guard<std::mutex> lock(mutex_); @@ -202,7 +198,8 @@ bool Processor::flowFilesQueued() { return false; for (auto &&conn : _incomingConnections) { - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + std::shared_ptr<Connection> connection = + std::static_pointer_cast<Connection>(conn); if (connection->getQueueSize() > 0) return true; } @@ -217,7 +214,8 @@ bool Processor::flowFilesOutGoingFull() { // We already has connection for this relationship std::set<std::shared_ptr<Connectable>> existedConnection = connection.second; for (const auto conn : existedConnection) { - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast< + Connection>(conn); if (connection->isFull()) return true; } @@ -251,7 +249,8 @@ bool Processor::isWorkAvailable() { try { for (const auto &conn : _incomingConnections) { - std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast< + Connection>(conn); if (connection->getQueueSize() > 0) { hasWork = true; break; @@ -259,13 +258,14 @@ bool Processor::isWorkAvailable() { } } catch (...) { logger_->log_error( - "Caught an exception while checking if work is available; unless it was positively determined that work is available, assuming NO work is available!"); + "Caught an exception while checking if work is available;" + " unless it was positively determined that work is available, assuming NO work is available!"); } return hasWork; } -} /* namespace processor */ +} /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/ProcessorNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp index 44491d3..8979e32 100644 --- a/libminifi/src/core/ProcessorNode.cpp +++ b/libminifi/src/core/ProcessorNode.cpp @@ -16,7 +16,7 @@ */ #include "core/ProcessorNode.h" - +#include <memory> namespace org { namespace apache { namespace nifi { @@ -25,29 +25,23 @@ namespace core { ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> processor) : processor_(processor), - Connectable(processor->getName(),0), + Connectable(processor->getName(), 0), ConfigurableComponent(logging::Logger::getLogger()) { - - uuid_t copy; - processor->getUUID(copy); - setUUID( copy ); - - + uuid_t copy; + processor->getUUID(copy); + setUUID(copy); } ProcessorNode::ProcessorNode(const ProcessorNode &other) : processor_(other.processor_), Connectable(other.getName(), 0), ConfigurableComponent(logging::Logger::getLogger()) { - - uuid_t copy; - processor_->getUUID(copy); - setUUID( copy ); - + uuid_t copy; + processor_->getUUID(copy); + setUUID(copy); } ProcessorNode::~ProcessorNode() { - } bool ProcessorNode::isWorkAvailable() { @@ -58,7 +52,7 @@ bool ProcessorNode::isRunning() { return processor_->isRunning(); } -} /* namespace processor */ +} /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Property.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Property.cpp b/libminifi/src/core/Property.cpp index 287b7ec..aa002af 100644 --- a/libminifi/src/core/Property.cpp +++ b/libminifi/src/core/Property.cpp @@ -17,10 +17,11 @@ */ #include "core/Property.h" - +#include <string> namespace org { namespace apache { -namespace nifi {namespace minifi { +namespace nifi { +namespace minifi { namespace core { // Get Name for the property
