http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/TailFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp new file mode 100644 index 0000000..859daa6 --- /dev/null +++ b/libminifi/src/processors/TailFile.cpp @@ -0,0 +1,271 @@ +/** + * @file TailFile.cpp + * TailFile class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <time.h> +#include <sstream> +#include <stdio.h> +#include <string> +#include <iostream> +#include <dirent.h> +#include <limits.h> +#include <unistd.h> + +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "processors/TailFile.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +const std::string TailFile::ProcessorName("TailFile"); +core::Property TailFile::FileName( + "File to Tail", + "Fully-qualified filename of the file that should be tailed", ""); +core::Property TailFile::StateFile( + "State File", + "Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off", + "TailFileState"); +core::Relationship TailFile::Success( + "success", "All files are routed to success"); + +void TailFile::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(FileName); + properties.insert(StateFile); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +std::string TailFile::trimLeft(const std::string& s) { + return org::apache::nifi::minifi::utils::StringUtils::trimLeft(s); +} + +std::string TailFile::trimRight(const std::string& s) { + return org::apache::nifi::minifi::utils::StringUtils::trimRight(s); +} + +void TailFile::parseStateFileLine(char *buf) { + char *line = buf; + + while ((line[0] == ' ') || (line[0] == '\t')) + ++line; + + char first = line[0]; + if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') + || (first == '=')) { + return; + } + + char *equal = strchr(line, '='); + if (equal == NULL) { + return; + } + + equal[0] = '\0'; + std::string key = line; + + equal++; + while ((equal[0] == ' ') || (equal[0] == '\t')) + ++equal; + + first = equal[0]; + if ((first == '\0') || (first == '\r') || (first == '\n')) { + return; + } + + std::string value = equal; + key = trimRight(key); + value = trimRight(value); + + if (key == "FILENAME") + this->_currentTailFileName = value; + if (key == "POSITION") + this->_currentTailFilePosition = std::stoi(value); + + return; +} + +void TailFile::recoverState() { + std::ifstream file(_stateFile.c_str(), std::ifstream::in); + if (!file.good()) { + logger_->log_error("load state file failed %s", _stateFile.c_str()); + return; + } + const unsigned int bufSize = 512; + char buf[bufSize]; + for (file.getline(buf, bufSize); file.good(); file.getline(buf, bufSize)) { + parseStateFileLine(buf); + } +} + +void TailFile::storeState() { + std::ofstream file(_stateFile.c_str()); + if (!file.is_open()) { + logger_->log_error("store state file failed %s", _stateFile.c_str()); + return; + } + file << "FILENAME=" << this->_currentTailFileName << "\n"; + file << "POSITION=" << this->_currentTailFilePosition << "\n"; + file.close(); +} + +static bool sortTailMatchedFileItem(TailMatchedFileItem i, + TailMatchedFileItem j) { + return (i.modifiedTime < j.modifiedTime); +} +void TailFile::checkRollOver() { + struct stat statbuf; + std::vector<TailMatchedFileItem> matchedFiles; + std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; + + if (stat(fullPath.c_str(), &statbuf) == 0) { + if (statbuf.st_size > this->_currentTailFilePosition) + // there are new input for the current tail file + return; + + uint64_t modifiedTimeCurrentTailFile = + ((uint64_t) (statbuf.st_mtime) * 1000); + std::string pattern = _fileName; + std::size_t found = _fileName.find_last_of("."); + if (found != std::string::npos) + pattern = _fileName.substr(0, found); + DIR *d; + d = opendir(this->_fileLocation.c_str()); + if (!d) + return; + while (1) { + struct dirent *entry; + entry = readdir(d); + if (!entry) + break; + std::string d_name = entry->d_name; + if (!(entry->d_type & DT_DIR)) { + std::string fileName = d_name; + std::string fileFullName = this->_fileLocation + "/" + d_name; + if (fileFullName.find(pattern) != std::string::npos + && stat(fileFullName.c_str(), &statbuf) == 0) { + if (((uint64_t) (statbuf.st_mtime) * 1000) + >= modifiedTimeCurrentTailFile) { + TailMatchedFileItem item; + item.fileName = fileName; + item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); + matchedFiles.push_back(item); + } + } + } + } + closedir(d); + + // Sort the list based on modified time + std::sort(matchedFiles.begin(), matchedFiles.end(), + sortTailMatchedFileItem); + for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); + it != matchedFiles.end(); ++it) { + TailMatchedFileItem item = *it; + if (item.fileName == _currentTailFileName) { + ++it; + if (it != matchedFiles.end()) { + TailMatchedFileItem nextItem = *it; + logger_->log_info("TailFile File Roll Over from %s to %s", + _currentTailFileName.c_str(), + nextItem.fileName.c_str()); + _currentTailFileName = nextItem.fileName; + _currentTailFilePosition = 0; + storeState(); + } + break; + } + } + } else + return; +} + +void TailFile::onTrigger( + core::ProcessContext *context, + core::ProcessSession *session) { + std::string value; + if (context->getProperty(FileName.getName(), value)) { + std::size_t found = value.find_last_of("/\\"); + this->_fileLocation = value.substr(0, found); + this->_fileName = value.substr(found + 1); + } + if (context->getProperty(StateFile.getName(), value)) { + _stateFile = value + "." + getUUIDStr(); + } + if (!this->_stateRecovered) { + _stateRecovered = true; + this->_currentTailFileName = _fileName; + this->_currentTailFilePosition = 0; + // recover the state if we have not done so + this->recoverState(); + } + checkRollOver(); + std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; + struct stat statbuf; + if (stat(fullPath.c_str(), &statbuf) == 0) { + if (statbuf.st_size <= this->_currentTailFilePosition) + // there are no new input for the current tail file + { + context->yield(); + return; + } + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());; + if (!flowFile) + return; + std::size_t found = _currentTailFileName.find_last_of("."); + std::string baseName = _currentTailFileName.substr(0, found); + std::string extension = _currentTailFileName.substr(found + 1); + flowFile->updateKeyedAttribute(PATH, _fileLocation); + flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath); + session->import(fullPath, flowFile, true, this->_currentTailFilePosition); + session->transfer(flowFile, Success); + logger_->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), + flowFile->getSize()); + std::string logName = baseName + "." + + std::to_string(_currentTailFilePosition) + "-" + + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + + extension; + flowFile->updateKeyedAttribute(FILENAME, logName); + this->_currentTailFilePosition += flowFile->getSize(); + storeState(); + } +} + + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/provenance/Provenance.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp new file mode 100644 index 0000000..a90e182 --- /dev/null +++ b/libminifi/src/provenance/Provenance.cpp @@ -0,0 +1,578 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <cstdint> +#include <vector> +#include <arpa/inet.h> +#include "io/DataStream.h" +#include "io/Serializable.h" +#include "provenance/Provenance.h" + +#include "core/logging/Logger.h" +#include "core/Relationship.h" +#include "FlowController.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace provenance { + +// DeSerialize +bool ProvenanceEventRecord::DeSerialize( + const std::shared_ptr<core::Repository> &repo, std::string key) { + std::string value; + bool ret; + + ret = repo->Get(key, value); + + if (!ret) { + logger_->log_error("NiFi Provenance Store event %s can not found", + key.c_str()); + return false; + } else + logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), + value.length()); + + org::apache::nifi::minifi::io::DataStream stream( + (const uint8_t*) value.data(), value.length()); + + ret = DeSerialize(stream); + + if (ret) { + logger_->log_debug( + "NiFi Provenance retrieve event %s size %d eventType %d success", + _eventIdStr.c_str(), stream.getSize(), _eventType); + } else { + logger_->log_debug( + "NiFi Provenance retrieve event %s size %d eventType %d fail", + _eventIdStr.c_str(), stream.getSize(), _eventType); + } + + return ret; +} + +bool ProvenanceEventRecord::Serialize( + const std::shared_ptr<core::Repository> &repo) { + + org::apache::nifi::minifi::io::DataStream outStream; + + int ret; + + ret = writeUTF(this->_eventIdStr, &outStream); + if (ret <= 0) { + + return false; + } + + uint32_t eventType = this->_eventType; + ret = write(eventType, &outStream); + if (ret != 4) { + + return false; + } + + ret = write(this->_eventTime, &outStream); + if (ret != 8) { + + return false; + } + + ret = write(this->_entryDate, &outStream); + if (ret != 8) { + return false; + } + + ret = write(this->_eventDuration, &outStream); + if (ret != 8) { + + return false; + } + + ret = write(this->_lineageStartDate, &outStream); + if (ret != 8) { + + return false; + } + + ret = writeUTF(this->_componentId, &outStream); + if (ret <= 0) { + + return false; + } + + ret = writeUTF(this->_componentType, &outStream); + if (ret <= 0) { + + return false; + } + + ret = writeUTF(this->uuid_, &outStream); + if (ret <= 0) { + + return false; + } + + ret = writeUTF(this->_details, &outStream); + if (ret <= 0) { + + return false; + } + + // write flow attributes + uint32_t numAttributes = this->_attributes.size(); + ret = write(numAttributes, &outStream); + if (ret != 4) { + + return false; + } + + for (auto itAttribute : _attributes) { + ret = writeUTF(itAttribute.first, &outStream, true); + if (ret <= 0) { + + return false; + } + ret = writeUTF(itAttribute.second, &outStream, true); + if (ret <= 0) { + + return false; + } + } + + ret = writeUTF(this->_contentFullPath, &outStream); + if (ret <= 0) { + + return false; + } + + ret = write(this->_size, &outStream); + if (ret != 8) { + + return false; + } + + ret = write(this->_offset, &outStream); + if (ret != 8) { + + return false; + } + + ret = writeUTF(this->_sourceQueueIdentifier, &outStream); + if (ret <= 0) { + + return false; + } + + if (this->_eventType == ProvenanceEventRecord::FORK + || this->_eventType == ProvenanceEventRecord::CLONE + || this->_eventType == ProvenanceEventRecord::JOIN) { + // write UUIDs + uint32_t number = this->_parentUuids.size(); + ret = write(number, &outStream); + if (ret != 4) { + + return false; + } + for (auto parentUUID : _parentUuids) { + ret = writeUTF(parentUUID, &outStream); + if (ret <= 0) { + + return false; + } + } + number = this->_childrenUuids.size(); + ret = write(number, &outStream); + if (ret != 4) { + return false; + } + for (auto childUUID : _childrenUuids) { + ret = writeUTF(childUUID, &outStream); + if (ret <= 0) { + + return false; + } + } + } else if (this->_eventType == ProvenanceEventRecord::SEND + || this->_eventType == ProvenanceEventRecord::FETCH) { + ret = writeUTF(this->_transitUri, &outStream); + if (ret <= 0) { + + return false; + } + } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { + ret = writeUTF(this->_transitUri, &outStream); + if (ret <= 0) { + + return false; + } + ret = writeUTF(this->_sourceSystemFlowFileIdentifier, &outStream); + if (ret <= 0) { + + return false; + } + } + + // Persistent to the DB + + if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), + outStream.getSize())) { + logger_->log_debug("NiFi Provenance Store event %s size %d success", + _eventIdStr.c_str(), outStream.getSize()); + } else { + logger_->log_error("NiFi Provenance Store event %s size %d fail", + _eventIdStr.c_str(), outStream.getSize()); + } + + // cleanup + + return true; +} + +bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, + const int bufferSize) { + + int ret; + + org::apache::nifi::minifi::io::DataStream outStream(buffer, bufferSize); + + ret = readUTF(this->_eventIdStr, &outStream); + + if (ret <= 0) { + return false; + } + + uint32_t eventType; + ret = read(eventType, &outStream); + if (ret != 4) { + return false; + } + this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType; + + ret = read(this->_eventTime, &outStream); + if (ret != 8) { + return false; + } + + ret = read(this->_entryDate, &outStream); + if (ret != 8) { + return false; + } + + ret = read(this->_eventDuration, &outStream); + if (ret != 8) { + return false; + } + + ret = read(this->_lineageStartDate, &outStream); + if (ret != 8) { + return false; + } + + ret = readUTF(this->_componentId, &outStream); + if (ret <= 0) { + return false; + } + + ret = readUTF(this->_componentType, &outStream); + if (ret <= 0) { + return false; + } + + ret = readUTF(this->uuid_, &outStream); + if (ret <= 0) { + return false; + } + + ret = readUTF(this->_details, &outStream); + + if (ret <= 0) { + return false; + } + + // read flow attributes + uint32_t numAttributes = 0; + ret = read(numAttributes, &outStream); + if (ret != 4) { + return false; + } + + for (uint32_t i = 0; i < numAttributes; i++) { + std::string key; + ret = readUTF(key, &outStream, true); + if (ret <= 0) { + return false; + } + std::string value; + ret = readUTF(value, &outStream, true); + if (ret <= 0) { + return false; + } + this->_attributes[key] = value; + } + + ret = readUTF(this->_contentFullPath, &outStream); + if (ret <= 0) { + return false; + } + + ret = read(this->_size, &outStream); + if (ret != 8) { + return false; + } + + ret = read(this->_offset, &outStream); + if (ret != 8) { + return false; + } + + ret = readUTF(this->_sourceQueueIdentifier, &outStream); + if (ret <= 0) { + return false; + } + + if (this->_eventType == ProvenanceEventRecord::FORK + || this->_eventType == ProvenanceEventRecord::CLONE + || this->_eventType == ProvenanceEventRecord::JOIN) { + // read UUIDs + uint32_t number = 0; + ret = read(number, &outStream); + if (ret != 4) { + return false; + } + + for (uint32_t i = 0; i < number; i++) { + std::string parentUUID; + ret = readUTF(parentUUID, &outStream); + if (ret <= 0) { + return false; + } + this->addParentUuid(parentUUID); + } + number = 0; + ret = read(number, &outStream); + if (ret != 4) { + return false; + } + for (uint32_t i = 0; i < number; i++) { + std::string childUUID; + ret = readUTF(childUUID, &outStream); + if (ret <= 0) { + return false; + } + this->addChildUuid(childUUID); + } + } else if (this->_eventType == ProvenanceEventRecord::SEND + || this->_eventType == ProvenanceEventRecord::FETCH) { + ret = readUTF(this->_transitUri, &outStream); + if (ret <= 0) { + return false; + } + } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { + ret = readUTF(this->_transitUri, &outStream); + if (ret <= 0) { + return false; + } + ret = readUTF(this->_sourceSystemFlowFileIdentifier, &outStream); + if (ret <= 0) { + return false; + } + } + + return true; +} + +void ProvenanceReporter::commit() { + for (auto event : _events) { + if (!repo_->isFull()) { + event->Serialize(repo_); + } else { + logger_->log_debug("Provenance Repository is full"); + } + } +} + +void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, + std::string detail) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CREATE, flow); + + if (event) { + event->setDetails(detail); + add(event); + } +} + +void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, + core::Relationship relation, std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::ROUTE, flow); + + if (event) { + event->setDetails(detail); + event->setRelationship(relation.getName()); + event->setEventDuration(processingDuration); + add(event); + } +} + +void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow, + std::string detail) { + ProvenanceEventRecord *event = allocate( + ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow); + + if (event) { + event->setDetails(detail); + add(event); + } +} + +void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, + std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate( + ProvenanceEventRecord::CONTENT_MODIFIED, flow); + + if (event) { + event->setDetails(detail); + event->setEventDuration(processingDuration); + add(event); + } +} + +void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, + std::shared_ptr<core::FlowFile> child) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::CLONE, parent); + + if (event) { + event->addChildFlowFile(child); + event->addParentFlowFile(parent); + add(event); + } +} + +void ProvenanceReporter::join( + std::vector<std::shared_ptr<core::FlowFile> > parents, + std::shared_ptr<core::FlowFile> child, std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::JOIN, child); + + if (event) { + event->addChildFlowFile(child); + std::vector<std::shared_ptr<core::FlowFile> >::iterator it; + for (it = parents.begin(); it != parents.end(); it++) { + std::shared_ptr<core::FlowFile> record = *it; + event->addParentFlowFile(record); + } + event->setDetails(detail); + event->setEventDuration(processingDuration); + add(event); + } +} + +void ProvenanceReporter::fork( + std::vector<std::shared_ptr<core::FlowFile> > child, + std::shared_ptr<core::FlowFile> parent, std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FORK, parent); + + if (event) { + event->addParentFlowFile(parent); + std::vector<std::shared_ptr<core::FlowFile> >::iterator it; + for (it = child.begin(); it != child.end(); it++) { + std::shared_ptr<core::FlowFile> record = *it; + event->addChildFlowFile(record); + } + event->setDetails(detail); + event->setEventDuration(processingDuration); + add(event); + } +} + +void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, + std::string detail) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::EXPIRE, flow); + + if (event) { + event->setDetails(detail); + add(event); + } +} + +void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, + std::string reason) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::DROP, flow); + + if (event) { + std::string dropReason = "Discard reason: " + reason; + event->setDetails(dropReason); + add(event); + } +} + +void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, + std::string transitUri, std::string detail, + uint64_t processingDuration, bool force) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::SEND, flow); + + if (event) { + event->setTransitUri(transitUri); + event->setDetails(detail); + event->setEventDuration(processingDuration); + if (!force) { + add(event); + } else { + if (!repo_->isFull()) + event->Serialize(repo_); + delete event; + } + } +} + +void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow, + std::string transitUri, + std::string sourceSystemFlowFileIdentifier, + std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::RECEIVE, flow); + + if (event) { + event->setTransitUri(transitUri); + event->setDetails(detail); + event->setEventDuration(processingDuration); + event->setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier); + add(event); + } +} + +void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow, + std::string transitUri, std::string detail, + uint64_t processingDuration) { + ProvenanceEventRecord *event = allocate(ProvenanceEventRecord::FETCH, flow); + + if (event) { + event->setTransitUri(transitUri); + event->setDetails(detail); + event->setEventDuration(processingDuration); + add(event); + } +} + +} /* namespace provenance */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/provenance/ProvenanceRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp new file mode 100644 index 0000000..88455be --- /dev/null +++ b/libminifi/src/provenance/ProvenanceRepository.cpp @@ -0,0 +1,75 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "provenance/Provenance.h" +#include "provenance/ProvenanceRepository.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace provenance{ + + + +void ProvenanceRepository::run() { + // threshold for purge + uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; + while (running_) { + std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); + uint64_t curTime = getTimeMillis(); + uint64_t size = repoSize(); + if (size >= purgeThreshold) { + std::vector<std::string> purgeList; + leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + ProvenanceEventRecord eventRead; + std::string key = it->key().ToString(); + if (eventRead.DeSerialize((uint8_t *) it->value().data(), + (int) it->value().size())) { + if ((curTime - eventRead.getEventTime()) + > max_partition_millis_) + purgeList.push_back(key); + } else { + logger_->log_debug("NiFi Provenance retrieve event %s fail", + key.c_str()); + purgeList.push_back(key); + } + } + delete it; + std::vector<std::string>::iterator itPurge; + for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) { + std::string eventId = *itPurge; + logger_->log_info("ProvenanceRepository Repo Purge %s", + eventId.c_str()); + Delete(eventId); + } + } + if (size > max_partition_bytes_) + repo_full_ = true; + else + repo_full_ = false; + } + return; +} +} /* namespace provenance */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/Server.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/Server.cpp b/libminifi/test/Server.cpp index 65245f6..9428ee0 100644 --- a/libminifi/test/Server.cpp +++ b/libminifi/test/Server.cpp @@ -22,7 +22,7 @@ #define DEFAULT_REPORT_INTERVAL 1000 // 1 sec #define MAX_READ_TIMEOUT 30000 // 30 seconds -//! FlowControl Protocol Msg Type +// FlowControl Protocol Msg Type typedef enum { REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow YAML version REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.yml from server ask device to apply and also device report interval @@ -31,7 +31,7 @@ typedef enum { MAX_FLOW_CONTROL_MSG_TYPE } FlowControlMsgType; -//! FlowControl Protocol Msg Type String +// FlowControl Protocol Msg Type String static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = { "REGISTER_REQ", @@ -40,7 +40,7 @@ static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = "REPORT_RESP" }; -//! Flow Control Msg Type to String +// Flow Control Msg Type to String inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) { if (type < MAX_FLOW_CONTROL_MSG_TYPE) @@ -49,7 +49,7 @@ inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) return NULL; } -//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV) +// FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV) typedef enum { //Fix length 8 bytes: client to server in register request, required field FLOW_SERIAL_NUMBER, @@ -70,7 +70,7 @@ typedef enum { MAX_FLOW_MSG_ID } FlowControlMsgID; -//! FlowControl Protocol Msg ID String +// FlowControl Protocol Msg ID String static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = { "FLOW_SERIAL_NUMBER", @@ -86,7 +86,7 @@ static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = #define TYPE_HDR_LEN 4 // Fix Hdr Type #define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes -//! FlowControl Protocol Msg Len +// FlowControl Protocol Msg Len inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) { if (id == FLOW_SERIAL_NUMBER) @@ -99,7 +99,7 @@ inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) return -1; } -//! Flow Control Msg Id to String +// Flow Control Msg Id to String inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) { if (id < MAX_FLOW_MSG_ID) @@ -108,7 +108,7 @@ inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) return NULL; } -//! Flow Control Respond status code +// Flow Control Respond status code typedef enum { RESP_SUCCESS, RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register @@ -118,7 +118,7 @@ typedef enum { MAX_RESP_CODE } FlowControlRespCode; -//! FlowControl Resp Code str +// FlowControl Resp Code str static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = { "RESP_SUCCESS", @@ -128,7 +128,7 @@ static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = "RESP_FAILURE" }; -//! Flow Control Resp Code to String +// Flow Control Resp Code to String inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) { if (code < MAX_RESP_CODE) @@ -137,16 +137,16 @@ inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) return NULL; } -//! Common FlowControlProtocol Header +// Common FlowControlProtocol Header typedef struct { - uint32_t msgType; //! Msg Type - uint32_t seqNumber; //! Seq Number to match Req with Resp - uint32_t status; //! Resp Code, see FlowControlRespCode - uint32_t payloadLen; //! Msg Payload length + uint32_t msgType; // Msg Type + uint32_t seqNumber; // Seq Number to match Req with Resp + uint32_t status; // Resp Code, see FlowControlRespCode + uint32_t payloadLen; // Msg Payload length } FlowControlProtocolHeader; -//! encode uint32_t +// encode uint32_t uint8_t *encode(uint8_t *buf, uint32_t value) { *buf++ = (value & 0xFF000000) >> 24; @@ -156,14 +156,14 @@ uint8_t *encode(uint8_t *buf, uint32_t value) return buf; } -//! encode uint32_t +// encode uint32_t uint8_t *decode(uint8_t *buf, uint32_t &value) { value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3])); return (buf + 4); } -//! encode byte array +// encode byte array uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) { memcpy(buf, bufArray, size); @@ -171,7 +171,7 @@ uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) return buf; } -//! encode std::string +// encode std::string uint8_t *encode(uint8_t *buf, std::string value) { // add the \0 for size http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 97c32e6..f73174b 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -23,63 +23,52 @@ #include "ResourceClaim.h" #include "catch.hpp" #include <vector> -#include "Logger.h" +#include "core/logging/Logger.h" +#include "core/core.h" class LogTestController { -public: - LogTestController(const std::string level = "debug") { - Logger::getLogger()->setLogLevel(level); - } - - - void enableDebug() - { - Logger::getLogger()->setLogLevel("debug"); - } - - ~LogTestController() { - Logger::getLogger()->setLogLevel(LOG_LEVEL_E::info); - } + public: + LogTestController(const std::string level = "debug") { + logging::Logger::getLogger()->setLogLevel(level); + } + + void enableDebug() { + logging::Logger::getLogger()->setLogLevel("debug"); + } + + ~LogTestController() { + logging::Logger::getLogger()->setLogLevel(logging::LOG_LEVEL_E::info); + } }; -class TestController{ -public: - - +class TestController { + public: - TestController() : log("info") - { - ResourceClaim::default_directory_path = "./"; - } + TestController() + : log("info") { + minifi::ResourceClaim::default_directory_path = "./"; + } - ~TestController() - { - for(auto dir : directories) - { - rmdir(dir); - } - } + ~TestController() { + for (auto dir : directories) { + rmdir(dir); + } + } - void enableDebug() { - log.enableDebug(); - } + void enableDebug() { + log.enableDebug(); + } - char *createTempDirectory(char *format) - { - char *dir = mkdtemp(format); - return dir; - } - -protected: - LogTestController log; - std::vector<char*> directories; + char *createTempDirectory(char *format) { + char *dir = mkdtemp(format); + return dir; + } + protected: + LogTestController log; + std::vector<char*> directories; }; - - - - #endif /* LIBMINIFI_TEST_TESTBASE_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/nodefs/NoLevelDB.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/nodefs/NoLevelDB.cpp b/libminifi/test/nodefs/NoLevelDB.cpp new file mode 100644 index 0000000..00c9212 --- /dev/null +++ b/libminifi/test/nodefs/NoLevelDB.cpp @@ -0,0 +1,34 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../TestBase.h" + +#include "core/core.h" +#include "core/RepositoryFactory.h" + +TEST_CASE("NoLevelDBTest1", "[NoLevelDBTest]") { + std::shared_ptr<core::Repository> prov_repo = core::createRepository( + "provenancerepository", true); + REQUIRE(nullptr != prov_repo); +} + +TEST_CASE("NoLevelDBTest2", "[NoLevelDBTest]") { + std::shared_ptr<core::Repository> prov_repo = core::createRepository( + "flowfilerepository", true); + REQUIRE(nullptr != prov_repo); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/nodefs/NoYamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/nodefs/NoYamlConfiguration.cpp b/libminifi/test/nodefs/NoYamlConfiguration.cpp new file mode 100644 index 0000000..9a9b10e --- /dev/null +++ b/libminifi/test/nodefs/NoYamlConfiguration.cpp @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "core/core.h" +#include "core/RepositoryFactory.h" + + +#include "core/ConfigurationFactory.h" + +TEST_CASE("NoYamlSupport1", "[NoYamlSupport1]") { + std::shared_ptr<core::Repository> prov_repo = core::createRepository( + "provenancerepository", true); +REQUIRE(nullptr != prov_repo); +std::unique_ptr<core::FlowConfiguration> flow_configuration = std::move( + core::createFlowConfiguration(prov_repo, prov_repo, + "yamlconfiguration")); + + + REQUIRE(nullptr != flow_configuration); + +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/unit/CRCTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/CRCTests.cpp b/libminifi/test/unit/CRCTests.cpp index 197b5be..54f27bc 100644 --- a/libminifi/test/unit/CRCTests.cpp +++ b/libminifi/test/unit/CRCTests.cpp @@ -22,60 +22,58 @@ #include "io/DataStream.h" #include "../TestBase.h" - TEST_CASE("Test CRC1", "[testcrc1]") { - BaseStream base; - CRCStream<BaseStream> test(&base); - test.writeData((uint8_t*)"cow",3); - REQUIRE(2580823964 == test.getCRC()); - - + org::apache::nifi::minifi::io::BaseStream base; + org::apache::nifi::minifi::io::CRCStream< + org::apache::nifi::minifi::io::BaseStream> test(&base); + test.writeData((uint8_t*) "cow", 3); + REQUIRE(2580823964 == test.getCRC()); + + } TEST_CASE("Test CRC2", "[testcrc2]") { - BaseStream base; - CRCStream<BaseStream> test(&base); - std::string fox = "the quick brown fox jumped over the brown fox"; - std::vector<uint8_t> charvect(fox.begin(), fox.end()); - test.writeData(charvect,charvect.size()); - REQUIRE(1922388889 == test.getCRC()); - - -} + org::apache::nifi::minifi::io::BaseStream base; + org::apache::nifi::minifi::io::CRCStream< + org::apache::nifi::minifi::io::BaseStream> test(&base); + std::string fox = "the quick brown fox jumped over the brown fox"; + std::vector<uint8_t> charvect(fox.begin(), fox.end()); + test.writeData(charvect, charvect.size()); + REQUIRE(1922388889 == test.getCRC()); +} TEST_CASE("Test CRC3", "[testcrc3]") { - BaseStream base; - CRCStream<BaseStream> test(&base); - uint64_t number=7; - test.write(number); - REQUIRE(4215687882 == test.getCRC()); - - -} + org::apache::nifi::minifi::io::BaseStream base; + org::apache::nifi::minifi::io::CRCStream< + org::apache::nifi::minifi::io::BaseStream> test(&base); + uint64_t number = 7; + test.write(number); + REQUIRE(4215687882 == test.getCRC()); +} TEST_CASE("Test CRC4", "[testcrc4]") { - BaseStream base; - CRCStream<BaseStream> test(&base); - uint32_t number=7; - test.write(number); - REQUIRE(3206564543 == test.getCRC()); - - + org::apache::nifi::minifi::io::BaseStream base; + org::apache::nifi::minifi::io::CRCStream< + org::apache::nifi::minifi::io::BaseStream> test(&base); + uint32_t number = 7; + test.write(number); + REQUIRE(3206564543 == test.getCRC()); + } TEST_CASE("Test CRC5", "[testcrc5]") { - BaseStream base; - CRCStream<BaseStream> test(&base); - uint16_t number=7; - test.write(number); - REQUIRE(3753740124 == test.getCRC()); - - -} \ No newline at end of file + org::apache::nifi::minifi::io::BaseStream base; + org::apache::nifi::minifi::io::CRCStream< + org::apache::nifi::minifi::io::BaseStream> test(&base); + uint16_t number = 7; + test.write(number); + REQUIRE(3753740124 == test.getCRC()); + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/unit/LoggerTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp index 8359037..efa30bf 100644 --- a/libminifi/test/unit/LoggerTests.cpp +++ b/libminifi/test/unit/LoggerTests.cpp @@ -19,125 +19,191 @@ #include <memory> #include "../TestBase.h" -#include "../../include/LogAppenders.h" +#include "core/logging/LogAppenders.h" + +using namespace logging; bool contains(std::string stringA, std::string ending) { - return (ending.length() > 0 && stringA.find(ending) != std::string::npos); + return (ending.length() > 0 && stringA.find(ending) != std::string::npos); } TEST_CASE("Test log Levels", "[ttl1]") { - std::ostringstream oss; - - std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(new OutputStreamAppender(oss,0)); - std::shared_ptr<Logger> logger = Logger::getLogger(); - logger->updateLogger(std::move(outputLogger)); - logger->setLogLevel("trace"); - logger->log_info("hello world"); - - REQUIRE( true == contains(oss.str(),"[minifi log -- OutputStreamAppender] [info] hello world")); + std::ostringstream oss; + + std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>( + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); + std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); + logger->updateLogger(std::move(outputLogger)); + logger->setLogLevel("trace"); + logger->log_info("hello world"); + + REQUIRE( + true + == contains( + oss.str(), + "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [info] hello world")); + + std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>( + new org::apache::nifi::minifi::core::logging::NullAppender()); + + logger->updateLogger(std::move(nullAppender)); } TEST_CASE("Test log Levels debug", "[ttl2]") { - std::ostringstream oss; - - std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(new OutputStreamAppender(oss,0)); - std::shared_ptr<Logger> logger = Logger::getLogger(); - logger->updateLogger(std::move(outputLogger)); - logger->setLogLevel("trace"); - logger->log_debug("hello world"); - - REQUIRE( true == contains(oss.str(),"[minifi log -- OutputStreamAppender] [debug] hello world")); + std::ostringstream oss; + + std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>( + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); + std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); + logger->updateLogger(std::move(outputLogger)); + logger->setLogLevel("trace"); + logger->log_debug("hello world"); + + REQUIRE( + true + == contains( + oss.str(), + "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [debug] hello world")); + + std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>( + new org::apache::nifi::minifi::core::logging::NullAppender()); + + logger->updateLogger(std::move(nullAppender)); } TEST_CASE("Test log Levels trace", "[ttl3]") { - std::ostringstream oss; + std::ostringstream oss; + + std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>( + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); + std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); + logger->updateLogger(std::move(outputLogger)); + logger->setLogLevel("trace"); + + logger->log_trace("hello world"); - std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(new OutputStreamAppender(oss,0)); - std::shared_ptr<Logger> logger = Logger::getLogger(); - logger->updateLogger(std::move(outputLogger)); - logger->setLogLevel("trace"); + REQUIRE( + true + == contains( + oss.str(), + "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [trace] hello world")); - logger->log_trace("hello world"); + std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>( + new org::apache::nifi::minifi::core::logging::NullAppender()); - REQUIRE( true == contains(oss.str(),"[minifi log -- OutputStreamAppender] [trace] hello world")); + logger->updateLogger(std::move(nullAppender)); } TEST_CASE("Test log Levels error", "[ttl4]") { - std::ostringstream oss; + std::ostringstream oss; - std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(new OutputStreamAppender(oss,0)); - std::shared_ptr<Logger> logger = Logger::getLogger(); - logger->updateLogger(std::move(outputLogger)); - logger->setLogLevel("trace"); + std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>( + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); + std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); + logger->updateLogger(std::move(outputLogger)); + logger->setLogLevel("trace"); - logger->log_error("hello world"); + logger->log_error("hello world"); - REQUIRE( true == contains(oss.str(),"[minifi log -- OutputStreamAppender] [error] hello world")); + REQUIRE( + true + == contains( + oss.str(), + "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world")); + + std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>( + new org::apache::nifi::minifi::core::logging::NullAppender()); + + logger->updateLogger(std::move(nullAppender)); } TEST_CASE("Test log Levels change", "[ttl5]") { - std::ostringstream oss; + std::ostringstream oss; + + std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>( + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); + std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); + logger->updateLogger(std::move(outputLogger)); + logger->setLogLevel("trace"); - std::unique_ptr<BaseLogger> outputLogger = std::unique_ptr<BaseLogger>(new OutputStreamAppender(oss,0)); - std::shared_ptr<Logger> logger = Logger::getLogger(); - logger->updateLogger(std::move(outputLogger)); - logger->setLogLevel("trace"); + logger->log_error("hello world"); - logger->log_error("hello world"); + REQUIRE( + true + == contains( + oss.str(), + "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world")); + oss.str(""); + oss.clear(); + REQUIRE(0 == oss.str().length()); + logger->setLogLevel("off"); - REQUIRE( true == contains(oss.str(),"[minifi log -- OutputStreamAppender] [error] hello world")); - oss.str(""); - oss.clear(); - REQUIRE( 0 == oss.str().length() ); - logger->setLogLevel("off"); + logger->log_error("hello world"); - logger->log_error("hello world"); + REQUIRE(0 == oss.str().length()); + + std::unique_ptr<BaseLogger> nullAppender = std::unique_ptr<BaseLogger>( + new org::apache::nifi::minifi::core::logging::NullAppender()); + + logger->updateLogger(std::move(nullAppender)); - REQUIRE( 0 == oss.str().length() ); } TEST_CASE("Test log LevelsConfigured", "[ttl6]") { - std::ostringstream oss; + std::ostringstream oss; + + minifi::Configure *config = minifi::Configure::getConfigure(); - Configure *config = Configure::getConfigure(); + config->set(BaseLogger::nifi_log_appender, + "OutputStreamAppender"); + config->set( + org::apache::nifi::minifi::core::logging::OutputStreamAppender::nifi_log_output_stream_error_stderr, + "true"); - config->set(BaseLogger::nifi_log_appender,"outputstreamappender"); - config->set(OutputStreamAppender::nifi_log_output_stream_error_stderr,"true"); + std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); - std::shared_ptr<Logger> logger = Logger::getLogger(); + auto oldrdbuf = std::cerr.rdbuf(); + std::cerr.rdbuf(oss.rdbuf()); - auto oldrdbuf = std::cerr.rdbuf(); - std::cerr.rdbuf(oss.rdbuf()); + std::unique_ptr<BaseLogger> newLogger = LogInstance::getConfiguredLogger( + config); - std::unique_ptr<BaseLogger> newLogger =LogInstance::getConfiguredLogger(config); + logger->updateLogger(std::move(newLogger)); - logger->updateLogger(std::move(newLogger)); + logger->setLogLevel("trace"); - logger->setLogLevel("trace"); + // capture stderr + logger->log_error("hello world"); - // capture stderr - logger->log_error("hello world"); + REQUIRE( + true + == contains( + oss.str(), + "[minifi log -- org::apache::nifi::minifi::core::logging::OutputStreamAppender] [error] hello world")); - REQUIRE( true == contains(oss.str(),"[minifi log -- OutputStreamAppender] [error] hello world")); + std::cerr.rdbuf(oldrdbuf); - std::cerr.rdbuf(oldrdbuf); + config->set(BaseLogger::nifi_log_appender, "nullappender"); - config->set(BaseLogger::nifi_log_appender,"nullappender"); + newLogger = LogInstance::getConfiguredLogger(config); - newLogger =LogInstance::getConfiguredLogger(config); + logger->updateLogger(std::move(newLogger)); - logger->updateLogger(std::move(newLogger)); + oss.str(""); + oss.clear(); + REQUIRE(0 == oss.str().length()); - oss.str(""); - oss.clear(); - REQUIRE( 0 == oss.str().length() ); + // should have nothing from the null appender + logger->log_info("hello world"); + logger->log_debug("hello world"); + logger->log_trace("hello world"); - // should have nothing from the null appender - logger->log_info("hello world"); - logger->log_debug("hello world"); - logger->log_trace("hello world"); -// logger->log_error("hello world"); + REQUIRE(0 == oss.str().length()); - REQUIRE( 0 == oss.str().length() ); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/unit/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index ae5f7e5..4f08d5d 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -20,157 +20,255 @@ #include "FlowController.h" #include "ProvenanceTestHelper.h" #include "../TestBase.h" -#include <memory> -#include "../../include/LogAppenders.h" -#include "GetFile.h" +#include "core/logging/LogAppenders.h" +#include "core/logging/BaseLogger.h" +#include "processors/GetFile.h" +#include "core/core.h" +#include "../../include/core/FlowFile.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" + +TEST_CASE("Test Creation of GetFile", "[getfileCreate]") { + org::apache::nifi::minifi::processors::GetFile processor("processorname"); + REQUIRE(processor.getName() == "processorname"); +} +TEST_CASE("Test Find file", "[getfileCreate2]") { -TEST_CASE("Test Creation of GetFile", "[getfileCreate]"){ - GetFile processor("processorname"); - REQUIRE( processor.getName() == "processorname"); -} + TestController testController; + + testController.enableDebug(); + -TEST_CASE("Test Find file", "[getfileCreate2]"){ + std::shared_ptr<core::Processor> processor = std::make_shared< + org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - TestController testController; + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + std::shared_ptr<minifi::FlowController> controller = std::make_shared<TestFlowController>(test_repo, test_repo); - Configure *config = Configure::getConfigure(); + - config->set(BaseLogger::nifi_log_appender,"rollingappender"); - config->set(OutputStreamAppender::nifi_log_output_stream_error_stderr,"true"); - std::shared_ptr<Logger> logger = Logger::getLogger(); - std::unique_ptr<BaseLogger> newLogger =LogInstance::getConfiguredLogger(config); - logger->updateLogger(std::move(newLogger)); - logger->setLogLevel("debug"); + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); - ProvenanceTestRepository provenanceRepo; - FlowTestRepository flowRepo; - TestFlowController controller(provenanceRepo, flowRepo); - FlowControllerFactory::getFlowController( dynamic_cast<FlowController*>(&controller)); + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); - GetFile processor("getfileCreate2"); - char format[] ="/tmp/gt.XXXXXX"; - char *dir = testController.createTempDirectory(format); + std::shared_ptr<minifi::Connection> connection = std::make_shared< + minifi::Connection>(test_repo,"getfileCreate2Connection"); + connection->setRelationship(core::Relationship("success", "description")); + // link the connections so that we can test results at the end for this + connection->setSource(processor); + connection->setDestination(processor); - uuid_t processoruuid; - REQUIRE( true == processor.getUUID(processoruuid) ); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(processoruuid); - Connection connection("getfileCreate2Connection"); - connection.setRelationship(Relationship("success","description")); + processor->addConnection(connection); + REQUIRE(dir != NULL); - // link the connections so that we can test results at the end for this + core::ProcessorNode node(processor); - connection.setSourceProcessor(&processor); - connection.setDestinationProcessor(&processor); + core::ProcessContext context(node, test_repo); + context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, + dir); + core::ProcessSession session(&context); - connection.setSourceProcessorUUID(processoruuid); - connection.setDestinationProcessorUUID(processoruuid); - processor.addConnection(&connection); - REQUIRE( dir != NULL ); + REQUIRE(processor->getName() == "getfileCreate2"); - ProcessContext context(&processor); - context.setProperty(GetFile::Directory,dir); - ProcessSession session(&context); + std::shared_ptr<core::FlowFile> record; + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onTrigger(&context, &session); - REQUIRE( processor.getName() == "getfileCreate2"); + provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); + record = session.get(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); - FlowFileRecord *record; - processor.setScheduledState(ScheduledState::RUNNING); - processor.onTrigger(&context,&session); + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); - ProvenanceReporter *reporter = session.getProvenanceReporter(); - std::set<ProvenanceEventRecord*> records = reporter->getEvents(); + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onTrigger(&context, &session); + unlink(ss.str().c_str()); + rmdir(dir); + reporter = session.getProvenanceReporter(); - record = session.get(); - REQUIRE( record== 0 ); - REQUIRE( records.size() == 0 ); + REQUIRE( processor->getName() == "getfileCreate2"); - std::fstream file; - std::stringstream ss; - std::string fileName("tstFile.ext"); - ss << dir << "/" << fileName; - file.open(ss.str(),std::ios::out); - file << "tempFile"; - int64_t fileSize = file.tellp(); - file.close(); + records = reporter->getEvents(); - processor.incrementActiveTasks(); - processor.setScheduledState(ScheduledState::RUNNING); + for (provenance::ProvenanceEventRecord *provEventRecord : records) { + REQUIRE(provEventRecord->getComponentType() == processor->getName()); + } + session.commit(); + std::shared_ptr<core::FlowFile> ffr = session.get(); - processor.onTrigger(&context,&session); - unlink(ss.str().c_str()); - rmdir(dir); + ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + REQUIRE(2 == repo->getRepoMap().size()); - reporter = session.getProvenanceReporter(); + for (auto entry : repo->getRepoMap()) { + provenance::ProvenanceEventRecord newRecord; + newRecord.DeSerialize((uint8_t*) entry.second.data(), + entry.second.length()); - records = reporter->getEvents(); + bool found = false; + for (auto provRec : records) { + if (provRec->getEventId() == newRecord.getEventId()) { + REQUIRE(provRec->getEventId() == newRecord.getEventId()); + REQUIRE(provRec->getComponentId() == newRecord.getComponentId()); + REQUIRE(provRec->getComponentType() == newRecord.getComponentType()); + REQUIRE(provRec->getDetails() == newRecord.getDetails()); + REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration()); + found = true; + break; + } + } + if (!found) + throw std::runtime_error("Did not find record"); - for(ProvenanceEventRecord *provEventRecord : records) - { - REQUIRE (provEventRecord->getComponentType() == processor.getName()); - } - session.commit(); - // verify flow file repo - REQUIRE( 1 == flowRepo.getRepoMap().size() ); + } - for(auto entry: flowRepo.getRepoMap()) - { - FlowFileEventRecord newRecord; - newRecord.DeSerialize((uint8_t*)entry.second.data(),entry.second.length()); - REQUIRE (fileSize == newRecord.getFileSize()); - REQUIRE (0 == newRecord.getFileOffset()); - std::map<std::string, std::string> attrs = newRecord.getAttributes(); - std::string key = FlowAttributeKey(FILENAME); - REQUIRE (attrs[key] == fileName); - } +} - FlowFileRecord *ffr = session.get(); +TEST_CASE("LogAttributeTest", "[getfileCreate3]") { + std::ostringstream oss; + std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr< + logging::BaseLogger>( + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); + std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); + logger->updateLogger(std::move(outputLogger)); - ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + TestController testController; - delete ffr; + testController.enableDebug(); - std::set<FlowFileRecord*> expiredFlows; - REQUIRE( 2 == provenanceRepo.getRepoMap().size() ); + std::shared_ptr<core::Repository> repo = std::make_shared< + TestRepository>(); - for(auto entry: provenanceRepo.getRepoMap()) - { - ProvenanceEventRecord newRecord; - newRecord.DeSerialize((uint8_t*)entry.second.data(),entry.second.length()); + std::shared_ptr<core::Processor> processor = std::make_shared< + org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - bool found = false; - for ( auto provRec : records) - { - if (provRec->getEventId() == newRecord.getEventId() ) - { - REQUIRE( provRec->getEventId() == newRecord.getEventId()); - REQUIRE( provRec->getComponentId() == newRecord.getComponentId()); - REQUIRE( provRec->getComponentType() == newRecord.getComponentType()); - REQUIRE( provRec->getDetails() == newRecord.getDetails()); - REQUIRE( provRec->getEventDuration() == newRecord.getEventDuration()); - found = true; - break; - } - } - if (!found) - throw std::runtime_error("Did not find record"); - } + std::shared_ptr<core::Processor> logAttribute = std::make_shared< + org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logattribute_uuid; + REQUIRE(true == logAttribute->getUUID(logattribute_uuid)); + std::shared_ptr<minifi::Connection> connection = std::make_shared< + minifi::Connection>(repo,"getfileCreate2Connection"); + connection->setRelationship(core::Relationship("success", "description")); + std::shared_ptr<minifi::Connection> connection2 = std::make_shared< + minifi::Connection>(repo,"logattribute"); + connection2->setRelationship(core::Relationship("success", "description")); + // link the connections so that we can test results at the end for this + connection->setSource(processor); -} + // link the connections so that we can test results at the end for this + connection->setDestination(logAttribute); + + connection2->setSource(logAttribute); + + + connection2->setSourceUUID(logattribute_uuid); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(logattribute_uuid); + + processor->addConnection(connection); + logAttribute->addConnection(connection); + logAttribute->addConnection(connection2); + REQUIRE(dir != NULL); + + core::ProcessorNode node(processor); + core::ProcessorNode node2(logAttribute); + + core::ProcessContext context(node, repo); + core::ProcessContext context2(node2, repo); + context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, + dir); + core::ProcessSession session(&context); + core::ProcessSession session2(&context2); + REQUIRE(processor->getName() == "getfileCreate2"); + std::shared_ptr<core::FlowFile> record; + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onTrigger(&context, &session); + logAttribute->incrementActiveTasks(); + logAttribute->setScheduledState(core::ScheduledState::RUNNING); + logAttribute->onTrigger(&context2, &session2); + + provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); + record = session.get(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + + processor->incrementActiveTasks(); + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onTrigger(&context, &session); + unlink(ss.str().c_str()); + rmdir(dir); + reporter = session.getProvenanceReporter(); + + records = reporter->getEvents(); + session.commit(); + oss.str(""); + oss.clear(); + + logAttribute->incrementActiveTasks(); + logAttribute->setScheduledState(core::ScheduledState::RUNNING); + logAttribute->onTrigger(&context2, &session2); + + //session2.commit(); + records = reporter->getEvents(); + + std::string log_attribute_output = oss.str(); + REQUIRE( + log_attribute_output.find("key:absolute.path value:" + ss.str()) + != std::string::npos); + REQUIRE(log_attribute_output.find("Size:8 Offset:0") != std::string::npos); + REQUIRE( + log_attribute_output.find("key:path value:" + std::string(dir)) + != std::string::npos); + + outputLogger = std::unique_ptr<logging::BaseLogger>( + new org::apache::nifi::minifi::core::logging::NullAppender()); + logger->updateLogger(std::move(outputLogger)); + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/unit/PropertyTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/PropertyTests.cpp b/libminifi/test/unit/PropertyTests.cpp index e9f1c19..dee809f 100644 --- a/libminifi/test/unit/PropertyTests.cpp +++ b/libminifi/test/unit/PropertyTests.cpp @@ -16,22 +16,23 @@ * limitations under the License. */ +#include "../../include/core/Property.h" #include "utils/StringUtils.h" #include "../TestBase.h" -#include "Property.h" + TEST_CASE("Test Boolean Conversion", "[testboolConversion]") { bool b; - REQUIRE(true == StringUtils::StringToBool("true",b)); - REQUIRE(true == StringUtils::StringToBool("True",b)); - REQUIRE(true == StringUtils::StringToBool("TRue",b)); - REQUIRE(true == StringUtils::StringToBool("tRUE",b)); + REQUIRE(true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("true",b)); + REQUIRE(true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("True",b)); + REQUIRE(true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("TRue",b)); + REQUIRE(true == org::apache::nifi::minifi::utils::StringUtils::StringToBool("tRUE",b)); - REQUIRE(false == StringUtils::StringToBool("FALSE",b)); - REQUIRE(false == StringUtils::StringToBool("FALLSEY",b)); - REQUIRE(false == StringUtils::StringToBool("FaLSE",b)); - REQUIRE(false == StringUtils::StringToBool("false",b)); + REQUIRE(false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("FALSE",b)); + REQUIRE(false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("FALLSEY",b)); + REQUIRE(false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("FaLSE",b)); + REQUIRE(false == org::apache::nifi::minifi::utils::StringUtils::StringToBool("false",b)); } @@ -41,7 +42,7 @@ TEST_CASE("Test Trimmer Right", "[testTrims]") { REQUIRE(test.c_str()[test.length() - 1] == '\n'); REQUIRE(test.c_str()[test.length() - 2] == '\t'); - test = StringUtils::trimRight(test); + test = org::apache::nifi::minifi::utils::StringUtils::trimRight(test); REQUIRE(test.c_str()[test.length() - 1] == 'd'); REQUIRE(test.c_str()[test.length() - 2] == 'a'); @@ -51,7 +52,7 @@ TEST_CASE("Test Trimmer Right", "[testTrims]") { REQUIRE(test.c_str()[test.length() - 1] == '\t'); REQUIRE(test.c_str()[test.length() - 2] == '\v'); - test = StringUtils::trimRight(test); + test = org::apache::nifi::minifi::utils::StringUtils::trimRight(test); REQUIRE(test.c_str()[test.length() - 1] == 'd'); REQUIRE(test.c_str()[test.length() - 2] == 'a'); @@ -61,7 +62,7 @@ TEST_CASE("Test Trimmer Right", "[testTrims]") { REQUIRE(test.c_str()[test.length() - 1] == '\f'); REQUIRE(test.c_str()[test.length() - 2] == ' '); - test = StringUtils::trimRight(test); + test = org::apache::nifi::minifi::utils::StringUtils::trimRight(test); REQUIRE(test.c_str()[test.length() - 1] == 'd'); @@ -74,7 +75,7 @@ TEST_CASE("Test Trimmer Left", "[testTrims]") { REQUIRE(test.c_str()[0] == '\t'); REQUIRE(test.c_str()[1] == '\n'); - test = StringUtils::trimLeft(test); + test = org::apache::nifi::minifi::utils::StringUtils::trimLeft(test); REQUIRE(test.c_str()[0] == 'a'); REQUIRE(test.c_str()[1] == ' '); @@ -84,7 +85,7 @@ TEST_CASE("Test Trimmer Left", "[testTrims]") { REQUIRE(test.c_str()[0] == '\v'); REQUIRE(test.c_str()[1] == '\t'); - test = StringUtils::trimLeft(test); + test = org::apache::nifi::minifi::utils::StringUtils::trimLeft(test); REQUIRE(test.c_str()[0] == 'a'); REQUIRE(test.c_str()[1] == ' '); @@ -94,7 +95,7 @@ TEST_CASE("Test Trimmer Left", "[testTrims]") { REQUIRE(test.c_str()[0] == ' '); REQUIRE(test.c_str()[1] == '\f'); - test = StringUtils::trimLeft(test); + test = org::apache::nifi::minifi::utils::StringUtils::trimLeft(test); REQUIRE(test.c_str()[0] == 'a'); REQUIRE(test.c_str()[1] == ' '); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 1ee6a4c..cb8f520 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -18,177 +18,120 @@ #ifndef LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ #define LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ -#include "Provenance.h" +#include "provenance/Provenance.h" #include "FlowController.h" -#include "FlowFileRepository.h" - -/** - * Test repository - */ -class FlowTestRepository : public FlowFileRepository -{ -public: - FlowTestRepository() -{ -} - //! initialize - bool initialize() - { - return true; - } - - //! Destructor - virtual ~FlowTestRepository() { - - } - - bool Put(std::string key, uint8_t *buf, int bufLen) - { - repositoryResults.insert(std::pair<std::string,std::string>(key,std::string((const char*)buf,bufLen))); - return true; - } - //! Delete - bool Delete(std::string key) - { - repositoryResults.erase(key); - return true; - } - //! Get - bool Get(std::string key, std::string &value) - { - auto result = repositoryResults.find(key); - if (result != repositoryResults.end()) - { - value = result->second; - return true; - } - else - { - return false; - } - } - - const std::map<std::string,std::string> &getRepoMap() const - { - return repositoryResults; - } - -protected: - std::map<std::string,std::string> repositoryResults; -}; - +#include "core/Repository.h" +#include "core/core.h" /** * Test repository */ -class ProvenanceTestRepository : public ProvenanceRepository -{ -public: - ProvenanceTestRepository() -{ -} - //! initialize - bool initialize() - { - return true; - } - - //! Destructor - virtual ~ProvenanceTestRepository() { - - } - - bool Put(std::string key, uint8_t *buf, int bufLen) - { - repositoryResults.insert(std::pair<std::string,std::string>(key,std::string((const char*)buf,bufLen))); - return true; - } - //! Delete - bool Delete(std::string key) - { - repositoryResults.erase(key); - return true; - } - //! Get - bool Get(std::string key, std::string &value) - { - auto result = repositoryResults.find(key); - if (result != repositoryResults.end()) - { - value = result->second; - return true; - } - else - { - return false; - } - } - - const std::map<std::string,std::string> &getRepoMap() const - { - return repositoryResults; - } - -protected: - std::map<std::string,std::string> repositoryResults; +class TestRepository : public core::Repository { + public: + TestRepository() + : Repository("repo_name", "./dir", 1000, 100, 0) { + } + // initialize + bool initialize() { + return true; + } + + // Destructor + virtual ~TestRepository() { + + } + + bool Put(std::string key, uint8_t *buf, int bufLen) { + repositoryResults.insert( + std::pair<std::string, std::string>( + key, std::string((const char*) buf, bufLen))); + return true; + } + // Delete + bool Delete(std::string key) { + repositoryResults.erase(key); + return true; + } + // Get + bool Get(std::string key, std::string &value) { + auto result = repositoryResults.find(key); + if (result != repositoryResults.end()) { + value = result->second; + return true; + } else { + return false; + } + } + + const std::map<std::string, std::string> &getRepoMap() const { + return repositoryResults; + } + + void run() { + // do nothing + } + protected: + std::map<std::string, std::string> repositoryResults; }; - -class TestFlowController : public FlowController -{ - -public: - TestFlowController(ProvenanceTestRepository &provenanceRepo, FlowTestRepository &flowRepo) : ::FlowController() - { - _provenanceRepo = dynamic_cast<ProvenanceRepository*>(&provenanceRepo); - _flowfileRepo = dynamic_cast<FlowFileRepository*>(&flowRepo); - } - ~TestFlowController() - { - - } - void load(){ - - } - - bool start() - { - _running.store(true); - return true; - } - - void stop(bool force) - { - _running.store(false); - } - void waitUnload(const uint64_t timeToWaitMs) - { - stop(true); - } - - void unload() - { - stop(true); - } - - void reload(std::string file) - { - - } - - bool isRunning() - { - return true; - } - - - Processor *createProcessor(std::string name, uuid_t uuid){ return 0;} - - ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid){ return 0;} - - ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid){ return 0; } - - Connection *createConnection(std::string name, uuid_t uuid){ return 0; } +class TestFlowController : public minifi::FlowController { + + public: + TestFlowController(std::shared_ptr<core::Repository> repo, + std::shared_ptr<core::Repository> flow_file_repo) + : minifi::FlowController(repo, flow_file_repo, nullptr, "",true) { + } + ~TestFlowController() { + + } + void load() { + + } + + bool start() { + running_.store(true); + return true; + } + + void stop(bool force) { + running_.store(false); + } + void waitUnload(const uint64_t timeToWaitMs) { + stop(true); + } + + void unload() { + stop(true); + } + + void reload(std::string file) { + + } + + bool isRunning() { + return true; + } + + std::shared_ptr<core::Processor> createProcessor(std::string name, + uuid_t uuid) { + return 0; + } + + core::ProcessGroup *createRootProcessGroup(std::string name, uuid_t uuid) { + return 0; + } + + core::ProcessGroup *createRemoteProcessGroup(std::string name, uuid_t uuid) { + return 0; + } + + std::shared_ptr<minifi::Connection> createConnection(std::string name, + uuid_t uuid) { + return 0; + } + protected: + void initializePaths(const std::string &adjustedFilename) { + std::cout << "what" << std::endl; + } }; - #endif /* LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/unit/ProvenanceTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp index d78de47..624601c 100644 --- a/libminifi/test/unit/ProvenanceTests.cpp +++ b/libminifi/test/unit/ProvenanceTests.cpp @@ -16,81 +16,81 @@ * limitations under the License. */ - #ifndef PROVENANCE_TESTS #define PROVENANCE_TESTS #include "../TestBase.h" #include "ProvenanceTestHelper.h" -#include "Provenance.h" +#include "provenance/Provenance.h" #include "FlowFileRecord.h" +#include "core/core.h" +#include "core/repository/FlowFileRepository.h" +TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") { - -TEST_CASE("Test Provenance record create", "[TestProvenanceEventRecord]"){ - - ProvenanceEventRecord record1(ProvenanceEventRecord::ProvenanceEventType::CREATE,"blah","blahblah"); - REQUIRE( record1.getAttributes().size() == 0); - REQUIRE( record1.getAlternateIdentifierUri().length() == 0); + provenance::ProvenanceEventRecord record1( + provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", + "blahblah"); + REQUIRE(record1.getAttributes().size() == 0); + REQUIRE(record1.getAlternateIdentifierUri().length() == 0); } +TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") { -TEST_CASE("Test Provenance record serialization", "[TestProvenanceEventRecordSerializeDeser]"){ + provenance::ProvenanceEventRecord record1( + provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", + "componenttype"); - ProvenanceEventRecord record1(ProvenanceEventRecord::ProvenanceEventType::CREATE,"componentid","componenttype"); + std::string eventId = record1.getEventId(); - std::string eventId = record1.getEventId(); - - std::string smileyface = ":)" ; - record1.setDetails(smileyface); + std::string smileyface = ":)"; + record1.setDetails(smileyface); - ProvenanceTestRepository repo; - uint64_t sample = 65555; - ProvenanceRepository *testRepository = dynamic_cast<ProvenanceRepository*>(&repo); - record1.setEventDuration(sample); + uint64_t sample = 65555; + std::shared_ptr<core::Repository> testRepository =std::make_shared<TestRepository>(); + record1.setEventDuration(sample); - record1.Serialize(testRepository); - ProvenanceEventRecord record2; - REQUIRE( record2.DeSerialize(testRepository,eventId) == true); - REQUIRE( record2.getEventId() == record1.getEventId()); - REQUIRE( record2.getComponentId() == record1.getComponentId()); - REQUIRE( record2.getComponentType() == record1.getComponentType()); - REQUIRE( record2.getDetails() == record1.getDetails()); - REQUIRE( record2.getDetails() == smileyface); - REQUIRE( record2.getEventDuration() == sample); + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + REQUIRE(record2.DeSerialize(testRepository,eventId) == true); + REQUIRE(record2.getEventId() == record1.getEventId()); + REQUIRE(record2.getComponentId() == record1.getComponentId()); + REQUIRE(record2.getComponentType() == record1.getComponentType()); + REQUIRE(record2.getDetails() == record1.getDetails()); + REQUIRE(record2.getDetails() == smileyface); + REQUIRE(record2.getEventDuration() == sample); } - -TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]"){ - - ProvenanceEventRecord record1(ProvenanceEventRecord::ProvenanceEventType::CLONE,"componentid","componenttype"); - std::string eventId = record1.getEventId(); - std::map<std::string, std::string> attributes; - attributes.insert(std::pair<std::string,std::string>("potato","potatoe")); - attributes.insert(std::pair<std::string,std::string>("tomato","tomatoe")); - FlowFileRecord ffr1(attributes); - - record1.addChildFlowFile(&ffr1); - - ProvenanceTestRepository repo; - uint64_t sample = 65555; - ProvenanceRepository *testRepository = dynamic_cast<ProvenanceRepository*>(&repo); - record1.setEventDuration(sample); - - record1.Serialize(testRepository); - ProvenanceEventRecord record2; - REQUIRE( record2.DeSerialize(testRepository,eventId) == true); - REQUIRE( record1.getChildrenUuids().size() == 1); - REQUIRE( record2.getChildrenUuids().size() == 1); - std::string childId = record2.getChildrenUuids().at(0); - REQUIRE( childId == ffr1.getUUIDStr()); - record2.removeChildUuid(childId); - REQUIRE( record2.getChildrenUuids().size() == 0); - +TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { + + provenance::ProvenanceEventRecord record1( + provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", + "componenttype"); + std::string eventId = record1.getEventId(); + std::map<std::string, std::string> attributes; + attributes.insert(std::pair<std::string, std::string>("potato", "potatoe")); + attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe")); + std::shared_ptr<core::repository::FlowFileRepository> frepo = std::make_shared<core::repository::FlowFileRepository>("./content_repository",0,0,0); + std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared< + minifi::FlowFileRecord>(frepo,attributes); + + record1.addChildFlowFile(ffr1); + + uint64_t sample = 65555; + std::shared_ptr<core::Repository> testRepository =std::make_shared<TestRepository>(); + record1.setEventDuration(sample); + + record1.Serialize(testRepository); + provenance::ProvenanceEventRecord record2; + REQUIRE(record2.DeSerialize(testRepository,eventId) == true); + REQUIRE(record1.getChildrenUuids().size() == 1); + REQUIRE(record2.getChildrenUuids().size() == 1); + std::string childId = record2.getChildrenUuids().at(0); + REQUIRE(childId == ffr1->getUUIDStr()); + record2.removeChildUuid(childId); + REQUIRE(record2.getChildrenUuids().size() == 0); } - - #endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/unit/SerializationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/SerializationTests.cpp b/libminifi/test/unit/SerializationTests.cpp index 96540bd..c6ada33 100644 --- a/libminifi/test/unit/SerializationTests.cpp +++ b/libminifi/test/unit/SerializationTests.cpp @@ -28,13 +28,13 @@ #define FMT_DEFAULT fmt_lower - +using namespace org::apache::nifi::minifi::io; TEST_CASE("TestSetPortId", "[S2S1]"){ - Site2SitePeer peer(std::unique_ptr<DataStream>(new DataStream()),"fake_host",65433); + std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr<minifi::Site2SitePeer>( new minifi::Site2SitePeer(std::unique_ptr<DataStream>(new DataStream()),"fake_host",65433)); - Site2SiteClientProtocol protocol(&peer); + minifi::Site2SiteClientProtocol protocol(std::move(peer)); std::string uuid_str = "c56a4180-65aa-42ec-a945-5fd21dec0538"; @@ -54,9 +54,9 @@ TEST_CASE("TestSetPortId", "[S2S1]"){ TEST_CASE("TestSetPortIdUppercase", "[S2S2]"){ - Site2SitePeer peer(std::unique_ptr<DataStream>(new DataStream()),"fake_host",65433); + std::unique_ptr<minifi::Site2SitePeer> peer = std::unique_ptr<minifi::Site2SitePeer>( new minifi::Site2SitePeer(std::unique_ptr<DataStream>(new DataStream()),"fake_host",65433)); - Site2SiteClientProtocol protocol(&peer); + minifi::Site2SiteClientProtocol protocol(std::move(peer)); std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538"; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/test/unit/SocketTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/SocketTests.cpp b/libminifi/test/unit/SocketTests.cpp index 3fb7c9e..e735f15 100644 --- a/libminifi/test/unit/SocketTests.cpp +++ b/libminifi/test/unit/SocketTests.cpp @@ -17,170 +17,159 @@ */ #include "../TestBase.h" -#include "../../include/io/ClientSocket.h" +#include "io/ClientSocket.h" + +using namespace org::apache::nifi::minifi::io; TEST_CASE("TestSocket", "[TestSocket1]") { - Socket socket("localhost",8183); - REQUIRE(-1 == socket.initialize() ); - REQUIRE("localhost" == socket.getHostname()); - socket.closeStream(); + Socket socket("localhost", 8183); + REQUIRE(-1 == socket.initialize()); + REQUIRE("localhost" == socket.getHostname()); + socket.closeStream(); } TEST_CASE("TestSocketWriteTest1", "[TestSocket2]") { - Socket socket("localhost",8183); - REQUIRE(-1 == socket.initialize() ); + Socket socket("localhost", 8183); + REQUIRE(-1 == socket.initialize()); - socket.writeData(0,0); + socket.writeData(0, 0); - std::vector<uint8_t> buffer; - buffer.push_back('a'); + std::vector<uint8_t> buffer; + buffer.push_back('a'); - REQUIRE(-1 == socket.writeData(buffer,1)); + REQUIRE(-1 == socket.writeData(buffer, 1)); - socket.closeStream(); + socket.closeStream(); } TEST_CASE("TestSocketWriteTest2", "[TestSocket3]") { - std::vector<uint8_t> buffer; - buffer.push_back('a'); + std::vector<uint8_t> buffer; + buffer.push_back('a'); + + Socket server("localhost", 9183, 1); - Socket server("localhost",9183,1); + REQUIRE(-1 != server.initialize()); - REQUIRE(-1 != server.initialize() ); + Socket client("localhost", 9183); - Socket client("localhost",9183); + REQUIRE(-1 != client.initialize()); - REQUIRE(-1 != client.initialize() ); + REQUIRE(1 == client.writeData(buffer, 1)); - REQUIRE( 1 == client.writeData(buffer,1) ); + std::vector<uint8_t> readBuffer; + readBuffer.resize(1); - std::vector<uint8_t> readBuffer; - readBuffer.resize(1); + REQUIRE(1 == server.readData(readBuffer, 1)); - REQUIRE( 1== server.readData(readBuffer,1) ); + REQUIRE(readBuffer == buffer); - REQUIRE( readBuffer == buffer ); + server.closeStream(); - server.closeStream(); - - - client.closeStream(); + client.closeStream(); } TEST_CASE("TestGetHostName", "[TestSocket4]") { - REQUIRE( Socket::getMyHostName().length() > 0 ); - - + REQUIRE(Socket::getMyHostName().length() > 0); } TEST_CASE("TestWriteEndian64", "[TestSocket4]") { - std::vector<uint8_t> buffer; - buffer.push_back('a'); + std::vector<uint8_t> buffer; + buffer.push_back('a'); - Socket server("localhost",9183,1); + Socket server("localhost", 9183, 1); - REQUIRE(-1 != server.initialize() ); + REQUIRE(-1 != server.initialize()); - Socket client("localhost",9183); + Socket client("localhost", 9183); - REQUIRE(-1 != client.initialize() ); + REQUIRE(-1 != client.initialize()); - uint64_t negative_one = -1; - REQUIRE( 8 == client.write(negative_one) ); + uint64_t negative_one = -1; + REQUIRE(8 == client.write(negative_one)); + uint64_t negative_two = 0; + REQUIRE(8 == server.read(negative_two)); - uint64_t negative_two = 0; - REQUIRE( 8 == server.read(negative_two) ); + REQUIRE(negative_two == negative_one); - REQUIRE( negative_two == negative_one ); + server.closeStream(); - - server.closeStream(); - - - client.closeStream(); + client.closeStream(); } TEST_CASE("TestWriteEndian32", "[TestSocket5]") { - std::vector<uint8_t> buffer; - buffer.push_back('a'); + std::vector<uint8_t> buffer; + buffer.push_back('a'); - Socket server("localhost",9183,1); + Socket server("localhost", 9183, 1); - REQUIRE(-1 != server.initialize() ); + REQUIRE(-1 != server.initialize()); - Socket client("localhost",9183); + Socket client("localhost", 9183); - REQUIRE(-1 != client.initialize() ); + REQUIRE(-1 != client.initialize()); - { - uint32_t negative_one = -1; - REQUIRE( 4 == client.write(negative_one) ); + { + uint32_t negative_one = -1; + REQUIRE(4 == client.write(negative_one)); + uint32_t negative_two = 0; + REQUIRE(4 == server.read(negative_two)); - uint32_t negative_two = 0; - REQUIRE( 4 == server.read(negative_two) ); + REQUIRE(negative_two == negative_one); + } - REQUIRE( negative_two == negative_one ); - } - - { - uint16_t negative_one = -1; - REQUIRE( 2 == client.write(negative_one) ); + { + uint16_t negative_one = -1; + REQUIRE(2 == client.write(negative_one)); + uint16_t negative_two = 0; + REQUIRE(2 == server.read(negative_two)); - uint16_t negative_two = 0; - REQUIRE( 2 == server.read(negative_two) ); + REQUIRE(negative_two == negative_one); + } + server.closeStream(); - REQUIRE( negative_two == negative_one ); - } - server.closeStream(); - - - client.closeStream(); + client.closeStream(); } - TEST_CASE("TestSocketWriteTestAfterClose", "[TestSocket6]") { - std::vector<uint8_t> buffer; - buffer.push_back('a'); + std::vector<uint8_t> buffer; + buffer.push_back('a'); - Socket server("localhost",9183,1); + Socket server("localhost", 9183, 1); - REQUIRE(-1 != server.initialize() ); + REQUIRE(-1 != server.initialize()); - Socket client("localhost",9183); + Socket client("localhost", 9183); - REQUIRE(-1 != client.initialize() ); + REQUIRE(-1 != client.initialize()); - REQUIRE( 1 == client.writeData(buffer,1) ); + REQUIRE(1 == client.writeData(buffer, 1)); - std::vector<uint8_t> readBuffer; - readBuffer.resize(1); + std::vector<uint8_t> readBuffer; + readBuffer.resize(1); - REQUIRE( 1== server.readData(readBuffer,1) ); + REQUIRE(1 == server.readData(readBuffer, 1)); - REQUIRE( readBuffer == buffer ); - - client.closeStream(); - - REQUIRE( -1 == client.writeData(buffer,1) ); - - server.closeStream(); + REQUIRE(readBuffer == buffer); + client.closeStream(); + REQUIRE(-1 == client.writeData(buffer, 1)); -} + server.closeStream(); +}
