http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Record.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Record.cpp b/libminifi/src/core/Record.cpp deleted file mode 100644 index 6f33300..0000000 --- a/libminifi/src/core/Record.cpp +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright 2017 <copyright holder> <email> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "core/FlowFile.h" -#include "core/logging/Logger.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace core { - -FlowFile::FlowFile() - : size_(0), - id_(0), - stored(false), - offset_(0), - last_queue_date_(0), - penaltyExpiration_ms_(0), - claim_(nullptr), - marked_delete_(false), - connection_(nullptr), - original_connection_() { - entry_date_ = getTimeMillis(); - lineage_start_date_ = entry_date_; - - char uuidStr[37]; - - // Generate the global UUID for the flow record - uuid_generate(uuid_); - - uuid_unparse_lower(uuid_, uuidStr); - uuid_str_ = uuidStr; - - logger_ = logging::Logger::getLogger(); - -} - -FlowFile::~FlowFile() { - -} - -FlowFile& FlowFile::operator=(const FlowFile& other) { - - uuid_copy(uuid_, other.uuid_); - stored = other.stored; - marked_delete_ = other.marked_delete_; - entry_date_ = other.entry_date_; - lineage_start_date_ = other.lineage_start_date_; - lineage_Identifiers_ = other.lineage_Identifiers_; - last_queue_date_ = other.last_queue_date_; - size_ = other.size_; - penaltyExpiration_ms_ = other.penaltyExpiration_ms_; - attributes_ = other.attributes_; - claim_ = other.claim_; - if (claim_ != nullptr) - this->claim_->increaseFlowFileRecordOwnedCount(); - uuid_str_ = other.uuid_str_; - connection_ = other.connection_; - original_connection_ = other.original_connection_; - - return *this; -} - -/** - * Returns whether or not this flow file record - * is marked as deleted. - * @return marked deleted - */ -bool FlowFile::isDeleted() { - return marked_delete_; -} - -/** - * Sets whether to mark this flow file record - * as deleted - * @param deleted deleted flag - */ -void FlowFile::setDeleted(const bool deleted) { - marked_delete_ = deleted; -} - -std::shared_ptr<ResourceClaim> FlowFile::getResourceClaim() { - return claim_; -} - -void FlowFile::clearResourceClaim() { - claim_ = nullptr; -} -void FlowFile::setResourceClaim(std::shared_ptr<ResourceClaim> &claim) { - claim_ = claim; -} - -// ! Get Entry Date -uint64_t FlowFile::getEntryDate() { - return entry_date_; -} -uint64_t FlowFile::getEventTime() { - return event_time_; -} -// ! Get Lineage Start Date -uint64_t FlowFile::getlineageStartDate() { - return lineage_start_date_; -} - -std::set<std::string> &FlowFile::getlineageIdentifiers() { - return lineage_Identifiers_; -} - -bool FlowFile::getAttribute(std::string key, std::string &value) { - auto it = attributes_.find(key); - if (it != attributes_.end()) { - value = it->second; - return true; - } else { - return false; - } -} - -// Get Size -uint64_t FlowFile::getSize() { - return size_; -} -// ! Get Offset -uint64_t FlowFile::getOffset() { - return offset_; -} - -bool FlowFile::removeAttribute(const std::string key) { - auto it = attributes_.find(key); - if (it != attributes_.end()) { - attributes_.erase(key); - return true; - } else { - return false; - } -} - -bool FlowFile::updateAttribute(const std::string key, const std::string value) { - auto it = attributes_.find(key); - if (it != attributes_.end()) { - attributes_[key] = value; - return true; - } else { - return false; - } -} - -bool FlowFile::addAttribute(const std::string &key, const std::string &value) { - - - auto it = attributes_.find(key); - if (it != attributes_.end()) { - // attribute already there in the map - return false; - } else { - attributes_[key] = value; - return true; - } -} - -void FlowFile::setLineageStartDate(const uint64_t date) { - lineage_start_date_ = date; -} - -/** - * Sets the original connection with a shared pointer. - * @param connection shared connection. - */ -void FlowFile::setOriginalConnection( - std::shared_ptr<core::Connectable> &connection) { - original_connection_ = connection; -} - -/** - * Sets the connection with a shared pointer. - * @param connection shared connection. - */ -void FlowFile::setConnection(std::shared_ptr<core::Connectable> &connection) { - connection_ = connection; -} - -/** - * Sets the connection with a shared pointer. - * @param connection shared connection. - */ -void FlowFile::setConnection(std::shared_ptr<core::Connectable> &&connection) { - connection_ = connection; -} - -/** - * Returns the connection referenced by this record. - * @return shared connection pointer. - */ -std::shared_ptr<core::Connectable> FlowFile::getConnection() { - return connection_; -} - -/** - * Returns the original connection referenced by this record. - * @return shared original connection pointer. - */ -std::shared_ptr<core::Connectable> FlowFile::getOriginalConnection() { - return original_connection_; -} - -} -} -} -} -}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/Repository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp index 9a27785..50e8cd2 100644 --- a/libminifi/src/core/Repository.cpp +++ b/libminifi/src/core/Repository.cpp @@ -15,15 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "core/Repository.h" +#include <arpa/inet.h> #include <cstdint> #include <vector> -#include <arpa/inet.h> #include "io/DataStream.h" #include "io/Serializable.h" #include "core/Relationship.h" #include "core/logging/Logger.h" #include "FlowController.h" -#include "core/Repository.h" #include "provenance/Provenance.h" #include "core/repository/FlowFileRepository.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/RepositoryFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp index 9bdc7c3..c24a2af 100644 --- a/libminifi/src/core/RepositoryFactory.cpp +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -1,6 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/RepositoryFactory.h" +#include <memory> +#include <string> +#include <algorithm> #include "core/Repository.h" -#include "core/Repository.h" - #ifdef LEVELDB_SUPPORT #include "core/repository/FlowFileRepository.h" #include "provenance/ProvenanceRepository.h" @@ -11,57 +29,51 @@ namespace apache { namespace nifi { namespace minifi { #ifndef LEVELDB_SUPPORT - namespace provenance{ - class ProvenanceRepository; - } +namespace provenance { +class ProvenanceRepository; +} #endif namespace core { #ifndef LEVELDB_SUPPORT - class FlowFileRepository; +class FlowFileRepository; #endif - - std::shared_ptr<core::Repository> createRepository( - const std::string configuration_class_name, bool fail_safe = false) { - - std::string class_name_lc = configuration_class_name; - std::transform(class_name_lc.begin(), class_name_lc.end(), - class_name_lc.begin(), ::tolower); - try { - std::shared_ptr<core::Repository> return_obj = nullptr; - if (class_name_lc == "flowfilerepository") { - - return_obj = instantiate<core::repository::FlowFileRepository>(); - } else if (class_name_lc == "provenancerepository") { - - - return_obj = instantiate<provenance::ProvenanceRepository>();//std::shared_ptr<core::Repository>((core::Repository*)instantiate<provenance::ProvenanceRepository>()); - - } - - if (return_obj){ - return return_obj; - } - if (fail_safe) { - return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, - 1, 1); - } else { - throw std::runtime_error( - "Support for the provided configuration class could not be found"); - } - } catch (const std::runtime_error &r) { - if (fail_safe) { - return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, - 1, 1); - } +std::shared_ptr<core::Repository> createRepository( + const std::string configuration_class_name, bool fail_safe) { + std::shared_ptr<core::Repository> return_obj = nullptr; + std::string class_name_lc = configuration_class_name; + std::transform(class_name_lc.begin(), class_name_lc.end(), + class_name_lc.begin(), ::tolower); + try { + std::shared_ptr<core::Repository> return_obj = nullptr; + if (class_name_lc == "flowfilerepository") { + return_obj = instantiate<core::repository::FlowFileRepository>(); + } else if (class_name_lc == "provenancerepository") { + return_obj = instantiate<provenance::ProvenanceRepository>(); } - throw std::runtime_error( - "Support for the provided configuration class could not be found"); + if (return_obj) { + return return_obj; + } + if (fail_safe) { + return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, + 1); + } else { + throw std::runtime_error( + "Support for the provided configuration class could not be found"); + } + } catch (const std::runtime_error &r) { + if (fail_safe) { + return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, + 1); + } } - + throw std::runtime_error( + "Support for the provided configuration class could not be found"); +} + } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/logging/BaseLogger.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/logging/BaseLogger.cpp b/libminifi/src/core/logging/BaseLogger.cpp index a6b43a8..d4774df 100644 --- a/libminifi/src/core/logging/BaseLogger.cpp +++ b/libminifi/src/core/logging/BaseLogger.cpp @@ -17,6 +17,10 @@ */ #include "core/logging/BaseLogger.h" +#include <utility> +#include <memory> +#include <algorithm> +#include <string> namespace org { namespace apache { @@ -68,7 +72,6 @@ void BaseLogger::log_info(const char * const format, ...) { * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match */ void BaseLogger::log_debug(const char * const format, ...) { - if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::debug)) return; FILL_BUFFER @@ -80,7 +83,6 @@ void BaseLogger::log_debug(const char * const format, ...) { * @warning does not check @p log or @p format for null. Caller must ensure parameters and format string lengths match */ void BaseLogger::log_trace(const char * const format, ...) { - if (logger_ == NULL || !logger_->should_log(spdlog::level::level_enum::trace)) return; FILL_BUFFER @@ -122,7 +124,6 @@ void BaseLogger::log_str(LOG_LEVEL_E level, const std::string &buffer) { logger_->info(buffer); break; } - } void BaseLogger::setLogLevel(const std::string &level, http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/logging/LogAppenders.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/logging/LogAppenders.cpp b/libminifi/src/core/logging/LogAppenders.cpp index 5d92334..1918a0d 100644 --- a/libminifi/src/core/logging/LogAppenders.cpp +++ b/libminifi/src/core/logging/LogAppenders.cpp @@ -24,12 +24,15 @@ namespace minifi { namespace core { namespace logging { -const char *OutputStreamAppender::nifi_log_output_stream_error_stderr="nifi.log.outputstream.appender.error.stderr"; - -const char *RollingAppender::nifi_log_rolling_apender_file = "nifi.log.rolling.appender.file"; -const char *RollingAppender::nifi_log_rolling_appender_max_files = "nifi.log.rolling.appender.max.files"; -const char *RollingAppender::nifi_log_rolling_appender_max_file_size = "nifi.log.rolling.appender.max.file_size"; +const char *OutputStreamAppender::nifi_log_output_stream_error_stderr = + "nifi.log.outputstream.appender.error.stderr"; +const char *RollingAppender::nifi_log_rolling_apender_file = + "nifi.log.rolling.appender.file"; +const char *RollingAppender::nifi_log_rolling_appender_max_files = + "nifi.log.rolling.appender.max.files"; +const char *RollingAppender::nifi_log_rolling_appender_max_file_size = + "nifi.log.rolling.appender.max.file_size"; } /* namespace logging */ } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/logging/Logger.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/logging/Logger.cpp b/libminifi/src/core/logging/Logger.cpp index d8cadf3..5c08fa8 100644 --- a/libminifi/src/core/logging/Logger.cpp +++ b/libminifi/src/core/logging/Logger.cpp @@ -21,6 +21,7 @@ #include <vector> #include <queue> +#include <memory> #include <map> namespace org { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/repository/FlowFileRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp index 8f13f39..5f62f83 100644 --- a/libminifi/src/core/repository/FlowFileRepository.cpp +++ b/libminifi/src/core/repository/FlowFileRepository.cpp @@ -1,4 +1,24 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ #include "core/repository/FlowFileRepository.h" +#include <memory> +#include <string> +#include <vector> #include "FlowFileRecord.h" namespace org { @@ -20,10 +40,12 @@ void FlowFileRepository::run() { leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this()); + std::shared_ptr<FlowFileRecord> eventRead = std::make_shared< + FlowFileRecord>(shared_from_this()); std::string key = it->key().ToString(); - if (eventRead->DeSerialize((uint8_t *) it->value().data(), - (int) it->value().size())) { + if (eventRead->DeSerialize( + reinterpret_cast<const uint8_t *>(it->value().data()), + it->value().size())) { if ((curTime - eventRead->getEventTime()) > max_partition_millis_) purgeList.push_back(key); } else { @@ -47,54 +69,44 @@ void FlowFileRepository::run() { return; } -void FlowFileRepository::loadComponent() - { - +void FlowFileRepository::loadComponent() { std::vector<std::string> purgeList; - leveldb::Iterator* it = db_->NewIterator( - leveldb::ReadOptions()); + leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) - { - std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::shared_ptr<FlowFileRecord> eventRead = + std::make_shared<FlowFileRecord>(shared_from_this()); std::string key = it->key().ToString(); - if (eventRead->DeSerialize((uint8_t *) it->value().data(), - (int) it->value().size())) - { + if (eventRead->DeSerialize( + reinterpret_cast<const uint8_t *>(it->value().data()), + it->value().size())) { auto search = connectionMap.find(eventRead->getConnectionUuid()); - if (search != connectionMap.end()) - { + if (search != connectionMap.end()) { // we find the connection for the persistent flowfile, create the flowfile and enqueue that - std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead); - std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(),flow_file_ref); + std::shared_ptr<core::FlowFile> flow_file_ref = + std::static_pointer_cast<core::FlowFile>(eventRead); + std::shared_ptr<FlowFileRecord> record = + std::make_shared<FlowFileRecord>(shared_from_this(), flow_file_ref); // set store to repo to true so that we do need to persistent again in enqueue record->setStoredToRepository(true); search->second->put(record); - } - else - { - if (eventRead->getContentFullPath().length() > 0) - { + } else { + if (eventRead->getContentFullPath().length() > 0) { std::remove(eventRead->getContentFullPath().c_str()); } purgeList.push_back(key); } - } - else - { + } else { purgeList.push_back(key); } } delete it; std::vector<std::string>::iterator itPurge; - for (itPurge = purgeList.begin(); itPurge != purgeList.end(); - itPurge++) - { + for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) { std::string eventId = *itPurge; - logger_->log_info("Repository Repo %s Purge %s", - name_.c_str(), - eventId.c_str()); + logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), + eventId.c_str()); Delete(eventId); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 8e59363..4e736f8 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -17,7 +17,10 @@ */ #include "core/yaml/YamlConfiguration.h" - +#include <memory> +#include <string> +#include <vector> +#include <set> namespace org { namespace apache { namespace nifi { @@ -29,18 +32,17 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml( uuid_t uuid; std::string flowName = rootFlowNode["name"].as<std::string>(); - std::string id ; - + std::string id; + try { rootFlowNode["id"].as<std::string>(); uuid_parse(id.c_str(), uuid); - }catch(...) - { + } catch (...) { logger_->log_warn("Generating random ID for root node"); uuid_generate(uuid); char uuid_str[37]; - uuid_unparse(uuid,uuid_str); + uuid_unparse(uuid, uuid_str); id = uuid_str; } @@ -69,7 +71,6 @@ void YamlConfiguration::parseProcessorNodeYaml( } if (processorsNode) { - if (processorsNode.IsSequence()) { // Evaluate sequence of processors int numProcessors = processorsNode.size(); @@ -196,7 +197,6 @@ void YamlConfiguration::parseProcessorNodeYaml( processor->setSchedulingStrategy(core::CRON_DRIVEN); logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy.c_str()); - } int64_t maxConcurrentTasks; @@ -324,7 +324,6 @@ void YamlConfiguration::parseRemoteProcessGroupYaml( this->parsePortYaml(&currPort, group, RECEIVE); } // for node } - } } } @@ -341,11 +340,9 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, } if (connectionsNode) { - if (connectionsNode->IsSequence()) { for (YAML::const_iterator iter = connectionsNode->begin(); iter != connectionsNode->end(); ++iter) { - YAML::Node connectionNode = iter->as<YAML::Node>(); std::string name = connectionNode["name"].as<std::string>(); @@ -461,7 +458,6 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); processor->setMaxConcurrentTasks(maxConcurrentTasks); - } void YamlConfiguration::parsePropertiesNodeYaml( http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/BaseStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/BaseStream.cpp b/libminifi/src/io/BaseStream.cpp index 1400a1d..8070c38 100644 --- a/libminifi/src/io/BaseStream.cpp +++ b/libminifi/src/io/BaseStream.cpp @@ -16,9 +16,9 @@ * limitations under the License. */ #include "io/BaseStream.h" +#include <string> #include "io/Serializable.h" - namespace org { namespace apache { namespace nifi { @@ -32,7 +32,8 @@ namespace io { * @return resulting write size **/ int BaseStream::write(uint32_t base_value, bool is_little_endian) { - return Serializable::write(base_value, (DataStream*) this, is_little_endian); + return Serializable::write(base_value, reinterpret_cast<DataStream*>(this), + is_little_endian); } /** @@ -43,7 +44,8 @@ int BaseStream::write(uint32_t base_value, bool is_little_endian) { * @return resulting write size **/ int BaseStream::write(uint16_t base_value, bool is_little_endian) { - return Serializable::write(base_value, (DataStream*) this, is_little_endian); + return Serializable::write(base_value, reinterpret_cast<DataStream*>(this), + is_little_endian); } /** @@ -54,7 +56,7 @@ int BaseStream::write(uint16_t base_value, bool is_little_endian) { * @return resulting write size **/ int BaseStream::write(uint8_t *value, int len) { - return Serializable::write(value, len, (DataStream*) this); + return Serializable::write(value, len, reinterpret_cast<DataStream*>(this)); } /** @@ -65,7 +67,8 @@ int BaseStream::write(uint8_t *value, int len) { * @return resulting write size **/ int BaseStream::write(uint64_t base_value, bool is_little_endian) { - return Serializable::write(base_value, (DataStream*) this, is_little_endian); + return Serializable::write(base_value, reinterpret_cast<DataStream*>(this), + is_little_endian); } /** @@ -74,8 +77,8 @@ int BaseStream::write(uint64_t base_value, bool is_little_endian) { * @return resulting write size **/ int BaseStream::write(bool value) { - uint8_t v = value; - return Serializable::write(v); + uint8_t v = value; + return Serializable::write(v); } /** @@ -84,7 +87,7 @@ int BaseStream::write(bool value) { * @return resulting write size **/ int BaseStream::writeUTF(std::string str, bool widen) { - return Serializable::writeUTF(str, (DataStream*) this, widen); + return Serializable::writeUTF(str, reinterpret_cast<DataStream*>(this), widen); } /** @@ -94,7 +97,7 @@ int BaseStream::writeUTF(std::string str, bool widen) { * @return resulting read size **/ int BaseStream::read(uint8_t &value) { - return Serializable::read(value, (DataStream*) this); + return Serializable::read(value, reinterpret_cast<DataStream*>(this)); } /** @@ -104,7 +107,7 @@ int BaseStream::read(uint8_t &value) { * @return resulting read size **/ int BaseStream::read(uint16_t &base_value, bool is_little_endian) { - return Serializable::read(base_value, (DataStream*) this); + return Serializable::read(base_value, reinterpret_cast<DataStream*>(this)); } /** @@ -114,7 +117,7 @@ int BaseStream::read(uint16_t &base_value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::read(char &value) { - return Serializable::read(value, (DataStream*) this); + return Serializable::read(value, reinterpret_cast<DataStream*>(this)); } /** @@ -125,7 +128,7 @@ int BaseStream::read(char &value) { * @return resulting read size **/ int BaseStream::read(uint8_t *value, int len) { - return Serializable::read(value, len, (DataStream*) this); + return Serializable::read(value, len, reinterpret_cast<DataStream*>(this)); } /** @@ -135,7 +138,8 @@ int BaseStream::read(uint8_t *value, int len) { * @return resulting read size **/ int BaseStream::read(uint32_t &value, bool is_little_endian) { - return Serializable::read(value, (DataStream*) this, is_little_endian); + return Serializable::read(value, reinterpret_cast<DataStream*>(this), + is_little_endian); } /** @@ -145,7 +149,8 @@ int BaseStream::read(uint32_t &value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::read(uint64_t &value, bool is_little_endian) { - return Serializable::read(value, (DataStream*) this, is_little_endian); + return Serializable::read(value, reinterpret_cast<DataStream*>(this), + is_little_endian); } /** @@ -155,10 +160,9 @@ int BaseStream::read(uint64_t &value, bool is_little_endian) { * @return resulting read size **/ int BaseStream::readUTF(std::string &str, bool widen) { - return Serializable::readUTF(str, (DataStream*) this, widen); + return Serializable::readUTF(str, reinterpret_cast<DataStream*>(this), widen); } - } /* namespace io */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/CRCStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/CRCStream.cpp b/libminifi/src/io/CRCStream.cpp index 47b45b5..e06a8f5 100644 --- a/libminifi/src/io/CRCStream.cpp +++ b/libminifi/src/io/CRCStream.cpp @@ -20,5 +20,3 @@ #include <memory> #include "io/CRCStream.h" - - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/ClientSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp index ad6b04d..e62d4f1 100644 --- a/libminifi/src/io/ClientSocket.cpp +++ b/libminifi/src/io/ClientSocket.cpp @@ -15,20 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "io/ClientSocket.h" #include <netinet/tcp.h> #include <sys/types.h> +#include <netdb.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <cstdio> +#include <utility> +#include <vector> #include <cerrno> -#include <netdb.h> #include <iostream> #include <string> - #include "io/validation.h" -#include "io/ClientSocket.h" namespace org { namespace apache { @@ -36,7 +37,7 @@ namespace nifi { namespace minifi { namespace io { -std::string Socket::HOSTNAME = Socket::getMyHostName(0); +char *Socket::HOSTNAME = const_cast<char*>(Socket::getMyHostName(0).c_str()); Socket::Socket(const std::string &hostname, const uint16_t port, const uint16_t listeners = -1) @@ -50,12 +51,10 @@ Socket::Socket(const std::string &hostname, const uint16_t port, logger_ = logging::Logger::getLogger(); FD_ZERO(&total_list_); FD_ZERO(&read_fds_); - } Socket::Socket(const std::string &hostname, const uint16_t port) : Socket(hostname, port, 0) { - } Socket::Socket(const Socket &&other) @@ -69,7 +68,6 @@ Socket::Socket(const Socket &&other) read_fds_(other.read_fds_), canonical_hostname_(std::move(other.canonical_hostname_)) { logger_ = logging::Logger::getLogger(); - } Socket::~Socket() { @@ -81,7 +79,6 @@ void Socket::closeStream() { freeaddrinfo(addr_info_); addr_info_ = 0; } - if (socket_file_descriptor_ >= 0) { close(socket_file_descriptor_); socket_file_descriptor_ = -1; @@ -98,12 +95,10 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { setSocketOptions(socket_file_descriptor_); if (listeners_ > 0) { - struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr; sa_loc->sin_family = AF_INET; sa_loc->sin_port = htons(port_); sa_loc->sin_addr.s_addr = htonl(INADDR_ANY); - if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) { logger_->log_error("Could not bind to socket", strerror(errno)); return -1; @@ -113,7 +108,6 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { if (listeners_ <= 0) { struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr; sa_loc->sin_family = AF_INET; - //sa_loc->sin_port = htons(port); sa_loc->sin_port = htons(port_); // use any address if you are connecting to the local machine for testing // otherwise we must use the requested hostname @@ -129,7 +123,6 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { logger_->log_warn("Could not connect to socket, error:%s", strerror(errno)); return -1; - } } } @@ -140,7 +133,6 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { logger_->log_warn("attempted connection, saw %s", strerror(errno)); return -1; } - } // add the listener to the total set FD_SET(socket_file_descriptor_, &total_list_); @@ -148,8 +140,7 @@ int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { return 0; } -short Socket::initialize() { - +int16_t Socket::initialize() { struct sockaddr_in servAddr; addrinfo hints = { sizeof(addrinfo) }; @@ -159,7 +150,6 @@ short Socket::initialize() { hints.ai_flags = AI_CANONNAME; if (listeners_ > 0) hints.ai_flags |= AI_PASSIVE; - hints.ai_protocol = 0; /* any protocol */ int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, @@ -188,8 +178,7 @@ short Socket::initialize() { int hh_errno; gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); #endif - - memcpy((char *) &addr, h->h_addr_list[0], h->h_length); + memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length); auto p = addr_info_; for (; p != NULL; p = p->ai_next) { @@ -197,8 +186,7 @@ short Socket::initialize() { if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname)) canonical_hostname_ = p->ai_canonname; } - - //we've successfully connected + // we've successfully connected if (port_ > 0 && createConnection(p, addr) >= 0) { return 0; break; @@ -206,10 +194,9 @@ short Socket::initialize() { } return -1; - } -short Socket::select_descriptor(const uint16_t msec) { +int16_t Socket::select_descriptor(const uint16_t msec) { struct timeval tv; int retval; @@ -233,7 +220,6 @@ short Socket::select_descriptor(const uint16_t msec) { for (int i = 0; i <= socket_max_; i++) { if (FD_ISSET(i, &read_fds_)) { - if (i == socket_file_descriptor_) { if (listeners_ > 0) { struct sockaddr_storage remoteaddr; // client address @@ -255,24 +241,23 @@ short Socket::select_descriptor(const uint16_t msec) { return i; } } - } return -1; } -short Socket::setSocketOptions(const int sock) { +int16_t Socket::setSocketOptions(const int sock) { int opt = 1; bool nagle_off = true; #ifndef __MACH__ if (nagle_off) { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *) &opt, sizeof(opt)) + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt)) < 0) { logger_->log_error("setsockopt() TCP_NODELAY failed"); close(sock); return -1; } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { logger_->log_error("setsockopt() SO_REUSEADDR failed"); close(sock); @@ -281,8 +266,8 @@ short Socket::setSocketOptions(const int sock) { } int sndsize = 256 * 1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &sndsize, - (int) sizeof(sndsize)) < 0) { + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>( &sndsize), + sizeof(sndsize)) < 0) { logger_->log_error("setsockopt() SO_SNDBUF failed"); close(sock); return -1; @@ -291,8 +276,8 @@ short Socket::setSocketOptions(const int sock) { #else if (listeners_ > 0) { // lose the pesky "address already in use" error message - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &opt, sizeof(opt)) - < 0) { + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) { logger_->log_error("setsockopt() SO_REUSEADDR failed"); close(sock); return -1; @@ -307,22 +292,19 @@ std::string Socket::getHostname() const { } int Socket::writeData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) return -1; - return writeData((uint8_t*) &buf[0], buflen); + return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); } // data stream overrides int Socket::writeData(uint8_t *value, int size) { - int ret = 0, bytes = 0; while (bytes < size) { - ret = send(socket_file_descriptor_, value + bytes, size - bytes, 0); - //check for errors + // check for errors if (ret <= 0) { close(socket_file_descriptor_); logger_->log_error("Could not send to %d, error: %s", @@ -330,27 +312,23 @@ int Socket::writeData(uint8_t *value, int size) { return ret; } bytes += ret; - } if (ret) logger_->log_trace("Send data size %d over socket %d", size, socket_file_descriptor_); - return bytes; - } template<typename T> inline std::vector<uint8_t> Socket::readBuffer(const T& t) { std::vector<uint8_t> buf; buf.resize(sizeof t); - readData((uint8_t*) &buf[0], sizeof(t)); + readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t)); return buf; } int Socket::write(uint64_t base_value, bool is_little_endian) { - return Serializable::write(base_value, this, is_little_endian); } @@ -363,7 +341,6 @@ int Socket::write(uint16_t base_value, bool is_little_endian) { } int Socket::read(uint64_t &value, bool is_little_endian) { - auto buf = readBuffer(value); if (is_little_endian) { @@ -381,68 +358,57 @@ int Socket::read(uint64_t &value, bool is_little_endian) { } int Socket::read(uint32_t &value, bool is_little_endian) { - auto buf = readBuffer(value); if (is_little_endian) { value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; } else { value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; - } - return sizeof(value); } int Socket::read(uint16_t &value, bool is_little_endian) { - auto buf = readBuffer(value); if (is_little_endian) { value = (buf[0] << 8) | buf[1]; } else { value = buf[0] | buf[1] << 8; - } return sizeof(value); } int Socket::readData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) { buf.resize(buflen); } - return readData((uint8_t*) &buf[0], buflen); + return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); } int Socket::readData(uint8_t *buf, int buflen) { - - int total_read = 0; + int32_t total_read = 0; while (buflen) { - short fd = select_descriptor(1000); + int16_t fd = select_descriptor(1000); if (fd < 0) { - logger_->log_info("fd close %i", buflen); close(socket_file_descriptor_); return -1; } - int bytes_read = recv(fd, buf, buflen, 0); if (bytes_read <= 0) { - if (bytes_read == 0) + if (bytes_read == 0) { logger_->log_info("Other side hung up on %d", fd); - else { + } else { logger_->log_error("Could not recv on %d, error: %s", fd, strerror(errno)); } return -1; } - buflen -= bytes_read; buf += bytes_read; total_read += bytes_read; } - return total_read; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/DataStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp index 7a10bd9..9e0dfce 100644 --- a/libminifi/src/io/DataStream.cpp +++ b/libminifi/src/io/DataStream.cpp @@ -15,17 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - - +#include "io/DataStream.h" +#include <arpa/inet.h> #include <vector> #include <iostream> #include <cstdint> #include <cstdio> #include <cstring> #include <string> -#include <arpa/inet.h> #include <algorithm> -#include "io/DataStream.h" namespace org { namespace apache { @@ -33,106 +31,92 @@ namespace nifi { namespace minifi { namespace io { - int DataStream::writeData(uint8_t *value, int size) { - - std::copy(value,value+size,std::back_inserter(buffer)); - - return size; + std::copy(value, value + size, std::back_inserter(buffer)); + return size; } int DataStream::read(uint64_t &value, bool is_little_endian) { - if ((8 + readBuffer) > buffer.size()) { - // if read exceed - return -1; - } - uint8_t *buf = &buffer[readBuffer]; - - if (is_little_endian) { - value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) - | ((uint64_t) (buf[2] & 255) << 40) - | ((uint64_t) (buf[3] & 255) << 32) - | ((uint64_t) (buf[4] & 255) << 24) - | ((uint64_t) (buf[5] & 255) << 16) - | ((uint64_t) (buf[6] & 255) << 8) - | ((uint64_t) (buf[7] & 255) << 0); - } else { - value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) - | ((uint64_t) (buf[2] & 255) << 16) - | ((uint64_t) (buf[3] & 255) << 24) - | ((uint64_t) (buf[4] & 255) << 32) - | ((uint64_t) (buf[5] & 255) << 40) - | ((uint64_t) (buf[6] & 255) << 48) - | ((uint64_t) (buf[7] & 255) << 56); - } - readBuffer += 8; - return 8; + if ((8 + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + uint8_t *buf = &buffer[readBuffer]; + + if (is_little_endian) { + value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) + | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) + | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16) + | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0); + } else { + value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) + | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) + | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40) + | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56); + } + readBuffer += 8; + return 8; } int DataStream::read(uint32_t &value, bool is_little_endian) { - if ((4 + readBuffer) > buffer.size()) { - // if read exceed - return -1; - } - uint8_t *buf = &buffer[readBuffer]; - - if (is_little_endian) { - value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; - } else { - value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; - - } - readBuffer += 4; - return 4; + if ((4 + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + uint8_t *buf = &buffer[readBuffer]; + + if (is_little_endian) { + value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; + } else { + value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; + } + readBuffer += 4; + return 4; } int DataStream::read(uint16_t &value, bool is_little_endian) { - - if ((2 + readBuffer) > buffer.size()) { - // if read exceed - return -1; - } - uint8_t *buf = &buffer[readBuffer]; - - if (is_little_endian) { - value = (buf[0] << 8) | buf[1]; - } else { - value = buf[0] | buf[1] << 8; - - } - readBuffer += 2; - return 2; + if ((2 + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } + uint8_t *buf = &buffer[readBuffer]; + + if (is_little_endian) { + value = (buf[0] << 8) | buf[1]; + } else { + value = buf[0] | buf[1] << 8; + } + readBuffer += 2; + return 2; } -int DataStream::readData(std::vector<uint8_t> &buf,int buflen) { - if ((buflen + readBuffer) > buffer.size()) { - // if read exceed - return -1; - } +int DataStream::readData(std::vector<uint8_t> &buf, int buflen) { + if ((buflen + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } - if (buf.capacity() < buflen) - buf.resize(buflen); + if (buf.capacity() < buflen) + buf.resize(buflen); - buf.insert(buf.begin(),&buffer[readBuffer],&buffer[readBuffer+buflen]); + buf.insert(buf.begin(), &buffer[readBuffer], &buffer[readBuffer + buflen]); - readBuffer += buflen; - return buflen; + readBuffer += buflen; + return buflen; } +int DataStream::readData(uint8_t *buf, int buflen) { + if ((buflen + readBuffer) > buffer.size()) { + // if read exceed + return -1; + } -int DataStream::readData(uint8_t *buf,int buflen) { - if ((buflen + readBuffer) > buffer.size()) { - // if read exceed - return -1; - } - - std::copy(&buffer[readBuffer],&buffer[readBuffer+buflen],buf); + std::copy(&buffer[readBuffer], &buffer[readBuffer + buflen], buf); - readBuffer += buflen; - return buflen; + readBuffer += buflen; + return buflen; } - } /* namespace io */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/EndianCheck.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/EndianCheck.cpp b/libminifi/src/io/EndianCheck.cpp index 1b5020d..fd754c4 100644 --- a/libminifi/src/io/EndianCheck.cpp +++ b/libminifi/src/io/EndianCheck.cpp @@ -25,7 +25,6 @@ namespace minifi { namespace io { bool EndiannessCheck::IS_LITTLE = EndiannessCheck::is_little_endian(); - } /* namespace io */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/Serializable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp index 7b7f2bd..c3f74c7 100644 --- a/libminifi/src/io/Serializable.cpp +++ b/libminifi/src/io/Serializable.cpp @@ -15,13 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "io/Serializable.h" +#include <arpa/inet.h> +#include <cstdio> #include <iostream> #include <vector> #include <string> -#include <cstdio> -#include <arpa/inet.h> +#include <algorithm> #include "io/DataStream.h" -#include "io/Serializable.h" namespace org { namespace apache { namespace nifi { @@ -29,8 +30,7 @@ namespace minifi { namespace io { #define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32)) - -#define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)),1) +#define IS_ASCII(c) __builtin_expect(!!((c >= 1) && (c <= 127)), 1) template<typename T> int Serializable::writeData(const T &t, DataStream *stream) { @@ -63,7 +63,7 @@ int Serializable::write(uint8_t value, DataStream *stream) { return stream->writeData(&value, 1); } int Serializable::write(char value, DataStream *stream) { - return stream->writeData((uint8_t *) &value, 1); + return stream->writeData(reinterpret_cast<uint8_t *>(&value), 1); } int Serializable::write(uint8_t *value, int len, DataStream *stream) { @@ -89,7 +89,7 @@ int Serializable::read(char &value, DataStream *stream) { int ret = stream->readData(&buf, 1); if (ret == 1) - value = (char) buf; + value = buf; return ret; } @@ -105,17 +105,14 @@ int Serializable::read(uint16_t &value, DataStream *stream, int Serializable::read(uint32_t &value, DataStream *stream, bool is_little_endian) { return stream->read(value, is_little_endian); - } int Serializable::read(uint64_t &value, DataStream *stream, bool is_little_endian) { return stream->read(value, is_little_endian); - } int Serializable::write(uint32_t base_value, DataStream *stream, bool is_little_endian) { - const uint32_t value = is_little_endian ? htonl(base_value) : base_value; return writeData(value, stream); @@ -123,7 +120,6 @@ int Serializable::write(uint32_t base_value, DataStream *stream, int Serializable::write(uint64_t base_value, DataStream *stream, bool is_little_endian) { - const uint64_t value = is_little_endian == 1 ? htonll_r(base_value) : base_value; return writeData(value, stream); @@ -131,7 +127,6 @@ int Serializable::write(uint64_t base_value, DataStream *stream, int Serializable::write(uint16_t base_value, DataStream *stream, bool is_little_endian) { - const uint16_t value = is_little_endian == 1 ? htons(base_value) : base_value; return writeData(value, stream); @@ -150,7 +145,6 @@ int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) { } else { uint32_t len; ret = read(len, stream); - if (ret <= 0) return ret; utflen = len; @@ -166,7 +160,6 @@ int Serializable::readUTF(std::string &str, DataStream *stream, bool widen) { // The number of chars produced may be less than utflen str = std::string((const char*) &buf[0], utflen); - return utflen; } @@ -181,7 +174,6 @@ int Serializable::writeUTF(std::string str, DataStream *stream, bool widen) { return -1; if (utflen == 0) { - if (!widen) { uint16_t shortLen = utflen; write(shortLen, stream); @@ -207,12 +199,10 @@ int Serializable::writeUTF(std::string str, DataStream *stream, bool widen) { int ret; if (!widen) { - uint16_t short_length = utflen; write(short_length, stream); ret = stream->writeData(utf_to_write.data(), utflen); } else { - //utflen += 4; write(utflen, stream); ret = stream->writeData(utf_to_write.data(), utflen); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/StreamFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp index e3aa290..1cf419e 100644 --- a/libminifi/src/io/StreamFactory.cpp +++ b/libminifi/src/io/StreamFactory.cpp @@ -15,11 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "../../include/io/StreamFactory.h" - +#include "io/StreamFactory.h" #include <atomic> -#include <mutex> - +#include <mutex> + namespace org { namespace apache { namespace nifi { @@ -29,7 +28,6 @@ namespace io { std::atomic<StreamFactory*> StreamFactory::context_instance_; std::mutex StreamFactory::context_mutex_; - } /* namespace io */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/io/tls/TLSSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp index b2df394..1499840 100644 --- a/libminifi/src/io/tls/TLSSocket.cpp +++ b/libminifi/src/io/tls/TLSSocket.cpp @@ -15,10 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "properties/Configure.h" #include "io/tls/TLSSocket.h" +#include <openssl/ssl.h> +#include <openssl/err.h> +#include <utility> +#include <string> +#include <vector> +#include "properties/Configure.h" #include "utils/StringUtils.h" - #include "core/Property.h" namespace org { @@ -27,116 +31,111 @@ namespace nifi { namespace minifi { namespace io { -#include <openssl/ssl.h> -#include <openssl/err.h> - std::atomic<TLSContext*> TLSContext::context_instance; std::mutex TLSContext::context_mutex; -TLSContext::TLSContext() : - error_value(0), ctx(0), logger_(logging::Logger::getLogger()), configuration( - Configure::getConfigure()) { - +TLSContext::TLSContext() + : error_value(0), + ctx(0), + logger_(logging::Logger::getLogger()), + configuration(Configure::getConfigure()) { } - /** * The memory barrier is defined by the singleton */ -short TLSContext::initialize() { - if (ctx != 0) { - return error_value; - } - std::string clientAuthStr; - bool needClientCert = true; - if (!(configuration->get(Configure::nifi_security_need_ClientAuth, - clientAuthStr) - && org::apache::nifi::minifi::utils::StringUtils::StringToBool(clientAuthStr, needClientCert))) { - needClientCert = true; - } - - SSL_library_init(); - const SSL_METHOD *method; - - OpenSSL_add_all_algorithms(); - SSL_load_error_strings(); - method = TLSv1_2_client_method(); - ctx = SSL_CTX_new(method); - if (ctx == NULL) { - logger_->log_error("Could not create SSL context, error: %s.", - std::strerror(errno)); - error_value = TLS_ERROR_CONTEXT; - return error_value; - } - if (needClientCert) { - std::string certificate; - std::string privatekey; - std::string passphrase; - std::string caCertificate; - - if (!(configuration->get(Configure::nifi_security_client_certificate, - certificate) - && configuration->get( - Configure::nifi_security_client_private_key, privatekey))) { - logger_->log_error( - "Certificate and Private Key PEM file not configured, error: %s.", - std::strerror(errno)); - error_value = TLS_ERROR_PEM_MISSING; - return error_value; - } - // load certificates and private key in PEM format - if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), - SSL_FILETYPE_PEM) <= 0) { - logger_->log_error("Could not create load certificate, error : %s", - std::strerror(errno)); - error_value = TLS_ERROR_CERT_MISSING; - return error_value; - - } - if (configuration->get(Configure::nifi_security_client_pass_phrase, - passphrase)) { - // if the private key has passphase - SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); - } - - - int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), - SSL_FILETYPE_PEM); - if (retp != 1) { - logger_->log_error("Could not create load private key,%i on %s error : %s", - retp,privatekey.c_str(),std::strerror(errno)); - error_value = TLS_ERROR_KEY_ERROR; - return error_value; - } - // verify private key - if (!SSL_CTX_check_private_key(ctx)) { - logger_->log_error( - "Private key does not match the public certificate, error : %s", - std::strerror(errno)); - error_value = TLS_ERROR_KEY_ERROR; - return error_value; - } - // load CA certificates - if (configuration->get(Configure::nifi_security_client_ca_certificate, - caCertificate)) { - retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0); - if (retp==0) { - logger_->log_error( - "Can not load CA certificate, Exiting, error : %s", - std::strerror(errno)); - error_value = TLS_ERROR_CERT_ERROR; - return error_value; - } - } - - logger_->log_info("Load/Verify Client Certificate OK."); - } - return 0; +int16_t TLSContext::initialize() { + if (ctx != 0) { + return error_value; + } + std::string clientAuthStr; + bool needClientCert = true; + if (!(configuration->get(Configure::nifi_security_need_ClientAuth, + clientAuthStr) + && org::apache::nifi::minifi::utils::StringUtils::StringToBool( + clientAuthStr, needClientCert))) { + needClientCert = true; + } + + SSL_library_init(); + const SSL_METHOD *method; + + OpenSSL_add_all_algorithms(); + SSL_load_error_strings(); + method = TLSv1_2_client_method(); + ctx = SSL_CTX_new(method); + if (ctx == NULL) { + logger_->log_error("Could not create SSL context, error: %s.", + std::strerror(errno)); + error_value = TLS_ERROR_CONTEXT; + return error_value; + } + if (needClientCert) { + std::string certificate; + std::string privatekey; + std::string passphrase; + std::string caCertificate; + + if (!(configuration->get(Configure::nifi_security_client_certificate, + certificate) + && configuration->get(Configure::nifi_security_client_private_key, + privatekey))) { + logger_->log_error( + "Certificate and Private Key PEM file not configured, error: %s.", + std::strerror(errno)); + error_value = TLS_ERROR_PEM_MISSING; + return error_value; + } + // load certificates and private key in PEM format + if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) + <= 0) { + logger_->log_error("Could not create load certificate, error : %s", + std::strerror(errno)); + error_value = TLS_ERROR_CERT_MISSING; + return error_value; + } + if (configuration->get(Configure::nifi_security_client_pass_phrase, + passphrase)) { + // if the private key has passphase + SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); + } + + int retp = SSL_CTX_use_PrivateKey_file(ctx, privatekey.c_str(), + SSL_FILETYPE_PEM); + if (retp != 1) { + logger_->log_error( + "Could not create load private key,%i on %s error : %s", retp, + privatekey.c_str(), std::strerror(errno)); + error_value = TLS_ERROR_KEY_ERROR; + return error_value; + } + // verify private key + if (!SSL_CTX_check_private_key(ctx)) { + logger_->log_error( + "Private key does not match the public certificate, error : %s", + std::strerror(errno)); + error_value = TLS_ERROR_KEY_ERROR; + return error_value; + } + // load CA certificates + if (configuration->get(Configure::nifi_security_client_ca_certificate, + caCertificate)) { + retp = SSL_CTX_load_verify_locations(ctx, caCertificate.c_str(), 0); + if (retp == 0) { + logger_->log_error("Can not load CA certificate, Exiting, error : %s", + std::strerror(errno)); + error_value = TLS_ERROR_CERT_ERROR; + return error_value; + } + } + + logger_->log_info("Load/Verify Client Certificate OK."); + } + return 0; } -TLSSocket::~TLSSocket() -{ - if (ssl != 0) - SSL_free(ssl); +TLSSocket::~TLSSocket() { + if (ssl != 0) + SSL_free(ssl); } /** * Constructor that accepts host name, port and listeners. With this @@ -146,102 +145,100 @@ TLSSocket::~TLSSocket() * @param listeners number of listeners in the queue */ TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port, - const uint16_t listeners) : - Socket(hostname, port, listeners), ssl(0) { + const uint16_t listeners) + : Socket(hostname, port, listeners), + ssl(0) { } -TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port) : - Socket(hostname, port, 0), ssl(0) { +TLSSocket::TLSSocket(const std::string &hostname, const uint16_t port) + : Socket(hostname, port, 0), + ssl(0) { } -TLSSocket::TLSSocket(const TLSSocket &&d) : - Socket(std::move(d)), ssl(0) { +TLSSocket::TLSSocket(const TLSSocket &&d) + : Socket(std::move(d)), + ssl(0) { } -short TLSSocket::initialize() { - TLSContext *context = TLSContext::getInstance(); - short ret = context->initialize(); - Socket::initialize(); - if (!ret) { - // we have s2s secure config - ssl = SSL_new(context->getContext()); - SSL_set_fd(ssl, socket_file_descriptor_); - if (SSL_connect(ssl) == -1) { - logger_->log_error("SSL socket connect failed to %s %d", - requested_hostname_.c_str(), port_); - SSL_free(ssl); - ssl = NULL; - close(socket_file_descriptor_); - return -1; - } else { - logger_->log_info("SSL socket connect success to %s %d", - requested_hostname_.c_str(), port_); - return 0; - } - } - return ret; +int16_t TLSSocket::initialize() { + TLSContext *context = TLSContext::getInstance(); + int16_t ret = context->initialize(); + Socket::initialize(); + if (!ret) { + // we have s2s secure config + ssl = SSL_new(context->getContext()); + SSL_set_fd(ssl, socket_file_descriptor_); + if (SSL_connect(ssl) == -1) { + logger_->log_error("SSL socket connect failed to %s %d", + requested_hostname_.c_str(), port_); + SSL_free(ssl); + ssl = NULL; + close(socket_file_descriptor_); + return -1; + } else { + logger_->log_info("SSL socket connect success to %s %d", + requested_hostname_.c_str(), port_); + return 0; + } + } + return ret; } -short TLSSocket::select_descriptor(const uint16_t msec) { - if (ssl && SSL_pending(ssl)) - return 1; - return Socket::select_descriptor(msec); +int16_t TLSSocket::select_descriptor(const uint16_t msec) { + if (ssl && SSL_pending(ssl)) + return 1; + return Socket::select_descriptor(msec); } -int TLSSocket::writeData(std::vector< uint8_t>& buf, int buflen) -{ - return Socket::writeData(buf,buflen); +int TLSSocket::writeData(std::vector<uint8_t>& buf, int buflen) { + return Socket::writeData(buf, buflen); } int TLSSocket::writeData(uint8_t *value, int size) { - if (IsNullOrEmpty(ssl)) - return -1; - // for SSL, wait for the TLS IO is completed - int bytes = 0; - int sent = 0; - while (bytes < size) { - - sent = SSL_write(ssl, value + bytes, size - bytes); - //check for errors - if (sent < 0) { - logger_->log_error("Site2Site Peer socket %d send failed %s", - socket_file_descriptor_, strerror(errno)); - return sent; - } - bytes += sent; - } - return size; + if (IsNullOrEmpty(ssl)) + return -1; + // for SSL, wait for the TLS IO is completed + int bytes = 0; + int sent = 0; + while (bytes < size) { + sent = SSL_write(ssl, value + bytes, size - bytes); + // check for errors + if (sent < 0) { + logger_->log_error("Site2Site Peer socket %d send failed %s", + socket_file_descriptor_, strerror(errno)); + return sent; + } + bytes += sent; + } + return size; } int TLSSocket::readData(uint8_t *buf, int buflen) { - - if (IsNullOrEmpty(ssl)) - return -1; - int total_read = 0; - int status = 0; - while (buflen) { - short fd = select_descriptor(1000); - if (fd <= 0) { - - close(socket_file_descriptor_); - return -1; - } - - int sslStatus; - do { - status = SSL_read(ssl, buf, buflen); - sslStatus = SSL_get_error(ssl, status); - } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ); - - buflen -= status; - buf += status; - total_read += status; - } - - return total_read; + if (IsNullOrEmpty(ssl)) + return -1; + int total_read = 0; + int status = 0; + while (buflen) { + int16_t fd = select_descriptor(1000); + if (fd <= 0) { + close(socket_file_descriptor_); + return -1; + } + + int sslStatus; + do { + status = SSL_read(ssl, buf, buflen); + sslStatus = SSL_get_error(ssl, status); + } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ); + + buflen -= status; + buf += status; + total_read += status; + } + + return total_read; } - } /* namespace io */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/AppendHostInfo.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/AppendHostInfo.cpp b/libminifi/src/processors/AppendHostInfo.cpp index 24ccc9a..b3c76db 100644 --- a/libminifi/src/processors/AppendHostInfo.cpp +++ b/libminifi/src/processors/AppendHostInfo.cpp @@ -17,27 +17,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include <set> +#include "processors/AppendHostInfo.h" +#define __USE_POSIX +#include <limits.h> #include <sys/time.h> #include <string.h> -#include "processors/AppendHostInfo.h" -#include "core/ProcessContext.h" -#include "core/Property.h" -#include "core/ProcessSession.h" - #include <netdb.h> #include <netinet/in.h> #include <sys/socket.h> #include <sys/ioctl.h> #include <net/if.h> #include <arpa/inet.h> - -#include "../../include/core/FlowFile.h" +#include <memory> +#include <string> +#include <set> +#include "core/ProcessContext.h" +#include "core/Property.h" +#include "core/ProcessSession.h" +#include "core/FlowFile.h" #include "io/ClientSocket.h" - -#define __USE_POSIX -#include <limits.h> - namespace org { namespace apache { namespace nifi { @@ -48,7 +46,6 @@ namespace processors { #define HOST_NAME_MAX 255 #endif -const std::string AppendHostInfo::ProcessorName("AppendHostInfo"); core::Property AppendHostInfo::InterfaceName( "Network Interface Name", "Network interface from which to read an IP v4 address", "eth0"); @@ -77,31 +74,29 @@ void AppendHostInfo::initialize() { setSupportedRelationships(relationships); } -void AppendHostInfo::onTrigger( - core::ProcessContext *context, - core::ProcessSession *session) { - std::shared_ptr<core::FlowFile> flow = - session->get(); +void AppendHostInfo::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { + std::shared_ptr<core::FlowFile> flow = session->get(); if (!flow) return; - //Get Hostname + // Get Hostname std::string hostAttribute = ""; context->getProperty(HostAttribute.getName(), hostAttribute); flow->addAttribute(hostAttribute.c_str(), org::apache::nifi::minifi::io::Socket::getMyHostName()); - //Get IP address for the specified interface + // Get IP address for the specified interface std::string iface; context->getProperty(InterfaceName.getName(), iface); - //Confirm the specified interface name exists on this device + // Confirm the specified interface name exists on this device if (if_nametoindex(iface.c_str()) != 0) { struct ifreq ifr; int fd = socket(AF_INET, SOCK_DGRAM, 0); - //Type of address to retrieve - IPv4 IP address + // Type of address to retrieve - IPv4 IP address ifr.ifr_addr.sa_family = AF_INET; - //Copy the interface name in the ifreq structure + // Copy the interface name in the ifreq structure strncpy(ifr.ifr_name, iface.c_str(), IFNAMSIZ - 1); ioctl(fd, SIOCGIFADDR, &ifr); close(fd); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/ExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp index 3cbbc1b..701c645 100644 --- a/libminifi/src/processors/ExecuteProcess.cpp +++ b/libminifi/src/processors/ExecuteProcess.cpp @@ -18,9 +18,12 @@ * limitations under the License. */ #include "processors/ExecuteProcess.h" +#include <cstring> +#include <memory> +#include <string> +#include <set> #include "core/ProcessContext.h" #include "core/ProcessSession.h" -#include <cstring> #include "utils/StringUtils.h" #include "utils/TimeUtil.h" @@ -30,14 +33,15 @@ namespace nifi { namespace minifi { namespace processors { -const std::string ExecuteProcess::ProcessorName("ExecuteProcess"); core::Property ExecuteProcess::Command( "Command", - "Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.", + "Specifies the command to be executed; if just the name of an executable" + " is provided, it must be in the user's environment PATH.", ""); core::Property ExecuteProcess::CommandArguments( "Command Arguments", - "The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.", + "The arguments to supply to the executable delimited by white space. White " + "space can be escaped by enclosing it in double-quotes.", ""); core::Property ExecuteProcess::WorkingDir( "Working Directory", @@ -45,7 +49,8 @@ core::Property ExecuteProcess::WorkingDir( ""); core::Property ExecuteProcess::BatchDuration( "Batch Duration", - "If the process is expected to be long-running and produce textual output, a batch duration can be specified.", + "If the process is expected to be long-running and produce textual output, a " + "batch duration can be specified.", "0"); core::Property ExecuteProcess::RedirectErrorStream( "Redirect Error Stream", @@ -69,9 +74,8 @@ void ExecuteProcess::initialize() { setSupportedRelationships(relationships); } -void ExecuteProcess::onTrigger( - core::ProcessContext *context, - core::ProcessSession *session) { +void ExecuteProcess::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { std::string value; if (context->getProperty(Command.getName(), value)) { this->_command = value; @@ -84,12 +88,10 @@ void ExecuteProcess::onTrigger( } if (context->getProperty(BatchDuration.getName(), value)) { core::TimeUnit unit; - if (core::Property::StringToTime(value, - _batchDuration, - unit) - && core::Property::ConvertTimeUnitToMS( - _batchDuration, unit, _batchDuration)) { - + if (core::Property::StringToTime(value, _batchDuration, unit) + && core::Property::ConvertTimeUnitToMS(_batchDuration, unit, + _batchDuration)) { + logger_->log_info("Setting _batchDuration"); } } if (context->getProperty(RedirectErrorStream.getName(), value)) { @@ -112,9 +114,7 @@ void ExecuteProcess::onTrigger( } logger_->log_info("Execute Command %s", _fullCommand.c_str()); // split the command into array - char cstr[_fullCommand.length() + 1]; - std::strcpy(cstr, _fullCommand.c_str()); - char *p = std::strtok(cstr, " "); + char *p = std::strtok(const_cast<char*>(_fullCommand.c_str()), " "); int argc = 0; char *argv[64]; while (p != 0 && argc < 64) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/GenerateFlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp index ebdaaa3..34c0ae2 100644 --- a/libminifi/src/processors/GenerateFlowFile.cpp +++ b/libminifi/src/processors/GenerateFlowFile.cpp @@ -17,18 +17,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "processors/GenerateFlowFile.h" +#include <sys/time.h> +#include <time.h> #include <vector> #include <queue> #include <map> +#include <memory> +#include <string> #include <set> -#include <sys/time.h> -#include <time.h> #include <chrono> #include <thread> #include <random> #include "utils/StringUtils.h" - -#include "processors/GenerateFlowFile.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" @@ -39,104 +40,101 @@ namespace minifi { namespace processors { const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary"; const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text"; -const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile"); -core::Property GenerateFlowFile::FileSize("File Size", "The size of the file that will be used", "1 kB"); -core::Property GenerateFlowFile::BatchSize("Batch Size", "The number of FlowFiles to be transferred in each invocation", "1"); -core::Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY); -core::Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles", - "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true"); -core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record"); +core::Property GenerateFlowFile::FileSize( + "File Size", "The size of the file that will be used", "1 kB"); +core::Property GenerateFlowFile::BatchSize( + "Batch Size", + "The number of FlowFiles to be transferred in each invocation", "1"); +core::Property GenerateFlowFile::DataFormat( + "Data Format", "Specifies whether the data should be Text or Binary", + GenerateFlowFile::DATA_FORMAT_BINARY); +core::Property GenerateFlowFile::UniqueFlowFiles( + "Unique FlowFiles", + "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", + "true"); +core::Relationship GenerateFlowFile::Success( + "success", "success operational on the flow record"); -void GenerateFlowFile::initialize() -{ - // Set the supported properties - std::set<core::Property> properties; - properties.insert(FileSize); - properties.insert(BatchSize); - properties.insert(DataFormat); - properties.insert(UniqueFlowFiles); - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); +void GenerateFlowFile::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(FileSize); + properties.insert(BatchSize); + properties.insert(DataFormat); + properties.insert(UniqueFlowFiles); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); } -void GenerateFlowFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) -{ - int64_t batchSize = 1; - bool uniqueFlowFile = true; - int64_t fileSize = 1024; +void GenerateFlowFile::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { + int64_t batchSize = 1; + bool uniqueFlowFile = true; + int64_t fileSize = 1024; - std::string value; - if (context->getProperty(FileSize.getName(), value)) - { - core::Property::StringToInt(value, fileSize); - } - if (context->getProperty(BatchSize.getName(), value)) - { - core::Property::StringToInt(value, batchSize); - } - if (context->getProperty(UniqueFlowFiles.getName(), value)) - { - org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, uniqueFlowFile); - } + std::string value; + if (context->getProperty(FileSize.getName(), value)) { + core::Property::StringToInt(value, fileSize); + } + if (context->getProperty(BatchSize.getName(), value)) { + core::Property::StringToInt(value, batchSize); + } + if (context->getProperty(UniqueFlowFiles.getName(), value)) { + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, + uniqueFlowFile); + } - if (!uniqueFlowFile) - { - char *data; - data = new char[fileSize]; - if (!data) - return; - uint64_t dataSize = fileSize; - GenerateFlowFile::WriteCallback callback(data, dataSize); - char *current = data; - for (int i = 0; i < fileSize; i+= sizeof(int)) - { - int randValue = random(); - *((int *) current) = randValue; - current += sizeof(int); - } - for (int i = 0; i < batchSize; i++) - { - // For each batch - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); - if (!flowFile) - return; - if (fileSize > 0) - session->write(flowFile, &callback); - session->transfer(flowFile, Success); - } - delete[] data; - } - else - { - if (!_data) - { - // We have not create the unique data yet - _data = new char[fileSize]; - _dataSize = fileSize; - char *current = _data; - for (int i = 0; i < fileSize; i+= sizeof(int)) - { - int randValue = random(); - *((int *) current) = randValue; - // *((int *) current) = (0xFFFFFFFF & i); - current += sizeof(int); - } - } - GenerateFlowFile::WriteCallback callback(_data, _dataSize); - for (int i = 0; i < batchSize; i++) - { - // For each batch - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); - if (!flowFile) - return; - if (fileSize > 0) - session->write(flowFile, &callback); - session->transfer(flowFile, Success); - } - } + if (!uniqueFlowFile) { + char *data; + data = new char[fileSize]; + if (!data) + return; + uint64_t dataSize = fileSize; + GenerateFlowFile::WriteCallback callback(data, dataSize); + char *current = data; + for (int i = 0; i < fileSize; i += sizeof(int)) { + int randValue = random(); + *(reinterpret_cast<int*>(current)) = randValue; + current += sizeof(int); + } + for (int i = 0; i < batchSize; i++) { + // For each batch + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< + FlowFileRecord>(session->create()); + if (!flowFile) + return; + if (fileSize > 0) + session->write(flowFile, &callback); + session->transfer(flowFile, Success); + } + delete[] data; + } else { + if (!_data) { + // We have not create the unique data yet + _data = new char[fileSize]; + _dataSize = fileSize; + char *current = _data; + for (int i = 0; i < fileSize; i += sizeof(int)) { + int randValue = random(); + *(reinterpret_cast<int*>(current)) = randValue; + current += sizeof(int); + } + } + GenerateFlowFile::WriteCallback callback(_data, _dataSize); + for (int i = 0; i < batchSize; i++) { + // For each batch + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< + FlowFileRecord>(session->create()); + if (!flowFile) + return; + if (fileSize > 0) + session->write(flowFile, &callback); + session->transfer(flowFile, Success); + } + } } } /* namespace processors */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/GetFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp index 652caf7..2740793 100644 --- a/libminifi/src/processors/GetFile.cpp +++ b/libminifi/src/processors/GetFile.cpp @@ -15,30 +15,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include <vector> -#include <queue> -#include <map> -#include <set> +#include "processors/GetFile.h" #include <sys/time.h> #include <sys/types.h> #include <sys/stat.h> #include <time.h> -#include <sstream> #include <stdio.h> -#include <string> -#include <iostream> #include <dirent.h> #include <limits.h> #include <unistd.h> -#if (__GNUC__ >= 4) +#if (__GNUC__ >= 4) #if (__GNUC_MINOR__ < 9) #include <regex.h> +#else +#include <regex> #endif #endif +#include <vector> +#include <queue> +#include <map> +#include <memory> +#include <set> +#include <sstream> +#include <string> +#include <iostream> #include "utils/StringUtils.h" -#include <regex> #include "utils/TimeUtil.h" -#include "processors/GetFile.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" @@ -47,7 +49,6 @@ namespace apache { namespace nifi { namespace minifi { namespace processors { -const std::string GetFile::ProcessorName("GetFile"); core::Property GetFile::BatchSize( "Batch Size", "The maximum number of files to pull in each iteration", "10"); @@ -62,11 +63,13 @@ core::Property GetFile::KeepSourceFile( "false"); core::Property GetFile::MaxAge( "Maximum File Age", - "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored", + "The minimum age that a file must be in order to be pulled;" + " any file younger than this amount of time (according to last modification date) will be ignored", "0 sec"); core::Property GetFile::MinAge( "Minimum File Age", - "The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored", + "The maximum age that a file must be in order to be pulled; any file" + "older than this amount of time (according to last modification date) will be ignored", "0 sec"); core::Property GetFile::MaxSize( "Maximum File Size", @@ -113,7 +116,6 @@ void GetFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { std::string value; - logger_->log_info("onTrigger GetFile"); if (context->getProperty(Directory.getName(), value)) { request_.directory = value; } @@ -129,13 +131,12 @@ void GetFile::onSchedule(core::ProcessContext *context, value, request_.keepSourceFile); } - logger_->log_info("onTrigger GetFile"); if (context->getProperty(MaxAge.getName(), value)) { core::TimeUnit unit; if (core::Property::StringToTime(value, request_.maxAge, unit) && core::Property::ConvertTimeUnitToMS(request_.maxAge, unit, request_.maxAge)) { - + logger_->log_debug("successfully applied _maxAge"); } } if (context->getProperty(MinAge.getName(), value)) { @@ -143,7 +144,7 @@ void GetFile::onSchedule(core::ProcessContext *context, if (core::Property::StringToTime(value, request_.minAge, unit) && core::Property::ConvertTimeUnitToMS(request_.minAge, unit, request_.minAge)) { - + logger_->log_debug("successfully applied _minAge"); } } if (context->getProperty(MaxSize.getName(), value)) { @@ -157,7 +158,7 @@ void GetFile::onSchedule(core::ProcessContext *context, if (core::Property::StringToTime(value, request_.pollInterval, unit) && core::Property::ConvertTimeUnitToMS(request_.pollInterval, unit, request_.pollInterval)) { - + logger_->log_debug("successfully applied _pollInterval"); } } if (context->getProperty(Recurse.getName(), value)) { @@ -172,11 +173,9 @@ void GetFile::onSchedule(core::ProcessContext *context, void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - // Perform directory list logger_->log_info("Is listing empty %i", isListingEmpty()); if (isListingEmpty()) { - if (request_.pollInterval == 0 || (getTimeMillis() - last_listing_time_) > request_.pollInterval) { performListing(request_.directory, request_); @@ -190,7 +189,6 @@ void GetFile::onTrigger(core::ProcessContext *context, std::queue<std::string> list; pollListing(list, request_); while (!list.empty()) { - std::string fileName = list.front(); list.pop(); logger_->log_info("GetFile process %s", fileName.c_str()); @@ -214,7 +212,6 @@ void GetFile::onTrigger(core::ProcessContext *context, throw; } } - } bool GetFile::isListingEmpty() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenHTTP.cpp b/libminifi/src/processors/ListenHTTP.cpp index 36c743e..5c4a6bb 100644 --- a/libminifi/src/processors/ListenHTTP.cpp +++ b/libminifi/src/processors/ListenHTTP.cpp @@ -1,5 +1,6 @@ /** * @file ListenHTTP.cpp + * ListenHTTP class implementation * * Licensed to the Apache Software Foundation (ASF) under one or more @@ -17,17 +18,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include <sstream> +#include "processors/ListenHTTP.h" +#include <uuid/uuid.h> +#include <CivetServer.h> #include <stdio.h> +#include <sstream> +#include <utility> +#include <memory> #include <string> #include <iostream> #include <fstream> -#include <uuid/uuid.h> - -#include <CivetServer.h> - -#include "processors/ListenHTTP.h" - +#include <set> +#include <vector> #include "utils/TimeUtil.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" @@ -39,15 +41,15 @@ namespace nifi { namespace minifi { namespace processors { -const std::string ListenHTTP::ProcessorName("ListenHTTP"); - -core::Property ListenHTTP::BasePath( - "Base Path", "Base path for incoming connections", "contentListener"); +core::Property ListenHTTP::BasePath("Base Path", + "Base path for incoming connections", + "contentListener"); core::Property ListenHTTP::Port( "Listening Port", "The Port to listen on for incoming connections", ""); core::Property ListenHTTP::AuthorizedDNPattern( "Authorized DN Pattern", - "A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.", + "A Regular Expression to apply against the Distinguished Name of incoming" + " connections. If the Pattern does not match the DN, the connection will be refused.", ".*"); core::Property ListenHTTP::SSLCertificate( "SSL Certificate", @@ -65,17 +67,18 @@ core::Property ListenHTTP::SSLMinimumVersion( "SSL2"); core::Property ListenHTTP::HeadersAsAttributesRegex( "HTTP Headers to receive as Attributes (Regex)", - "Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes", + "Specifies the Regular Expression that determines the names of HTTP Headers that" + " should be passed along as FlowFile attributes", ""); -core::Relationship ListenHTTP::Success( - "success", "All files are routed to success"); +core::Relationship ListenHTTP::Success("success", + "All files are routed to success"); void ListenHTTP::initialize() { _logger->log_info("Initializing ListenHTTP"); // Set the supported properties - std::set < core::Property > properties; + std::set<core::Property> properties; properties.insert(BasePath); properties.insert(Port); properties.insert(AuthorizedDNPattern); @@ -84,17 +87,15 @@ void ListenHTTP::initialize() { properties.insert(SSLVerifyPeer); properties.insert(SSLMinimumVersion); properties.insert(HeadersAsAttributesRegex); - setSupportedProperties (properties); + setSupportedProperties(properties); // Set the supported relationships - std::set < core::Relationship > relationships; + std::set<core::Relationship> relationships; relationships.insert(Success); - setSupportedRelationships (relationships); + setSupportedRelationships(relationships); } -void ListenHTTP::onSchedule( - core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory) { - +void ListenHTTP::onSchedule(core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory) { std::string basePath; if (!context->getProperty(BasePath.getName(), basePath)) { @@ -178,7 +179,7 @@ void ListenHTTP::onSchedule( listeningPort.c_str(), basePath.c_str(), numThreads); // Initialize web server - std::vector < std::string > options; + std::vector<std::string> options; options.push_back("enable_keep_alive"); options.push_back("yes"); options.push_back("keep_alive_timeout_ms"); @@ -238,12 +239,10 @@ void ListenHTTP::onSchedule( ListenHTTP::~ListenHTTP() { } -void ListenHTTP::onTrigger( - core::ProcessContext *context, - core::ProcessSession *session) { - - std::shared_ptr < FlowFileRecord > flowFile = std::static_pointer_cast - < FlowFileRecord > (session->get()); +void ListenHTTP::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< + FlowFileRecord>(session->get()); // Do nothing if there are no incoming files if (!flowFile) { @@ -251,10 +250,10 @@ void ListenHTTP::onTrigger( } } -ListenHTTP::Handler::Handler( - core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory, - std::string &&authDNPattern, std::string &&headersAsAttributesPattern) +ListenHTTP::Handler::Handler(core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory, + std::string &&authDNPattern, + std::string &&headersAsAttributesPattern) : _authDNRegex(std::move(authDNPattern)), _headersAsAttributesRegex(std::move(headersAsAttributesPattern)) { _processContext = context; @@ -292,8 +291,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server, auto session = _processSessionFactory->createSession(); ListenHTTP::WriteCallback callback(conn, req_info); - auto flowFile = std::static_pointer_cast < FlowFileRecord - > (session->create()); + auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); if (!flowFile) { sendErrorResponse(conn); @@ -347,9 +345,9 @@ ListenHTTP::WriteCallback::WriteCallback( } void ListenHTTP::WriteCallback::process(std::ofstream *stream) { - long long rlen; - long long nlen = 0; - long long tlen = _reqInfo->content_length; + int64_t rlen; + int64_t nlen = 0; + int64_t tlen = _reqInfo->content_length; char buf[16384]; while (nlen < tlen) {
