http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp new file mode 100644 index 0000000..88ea78a --- /dev/null +++ b/libminifi/src/Site2SiteClientProtocol.cpp @@ -0,0 +1,1313 @@ +/** + * @file Site2SiteProtocol.cpp + * Site2SiteProtocol class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <sys/time.h> +#include <stdio.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <random> +#include <netinet/tcp.h> +#include <iostream> +#include "Site2SitePeer.h" +#include "Site2SiteClientProtocol.h" + +bool Site2SiteClientProtocol::establish() +{ + if (_peerState != IDLE) + { + _logger->log_error("Site2Site peer state is not idle while try to establish"); + return false; + } + + bool ret = _peer->Open(); + + if (!ret) + { + _logger->log_error("Site2Site peer socket open failed"); + return false; + } + + // Negotiate the version + ret = initiateResourceNegotiation(); + + if (!ret) + { + _logger->log_error("Site2Site Protocol Version Negotiation failed"); + /* + _peer->yield(); + tearDown(); */ + return false; + } + + _logger->log_info("Site2Site socket established"); + _peerState = ESTABLISHED; + + return true; +} + +bool Site2SiteClientProtocol::initiateResourceNegotiation() +{ + // Negotiate the version + if (_peerState != IDLE) + { + _logger->log_error("Site2Site peer state is not idle while initiateResourceNegotiation"); + return false; + } + + _logger->log_info("Negotiate protocol version with destination port %s current version %d", _portIdStr.c_str(), _currentVersion); + + int ret = _peer->writeUTF(this->getResourceName()); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + ret = _peer->write(_currentVersion); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + uint8_t statusCode; + ret = _peer->read(statusCode); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + switch (statusCode) + { + case RESOURCE_OK: + _logger->log_info("Site2Site Protocol Negotiate protocol version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = _peer->read(serverVersion); + if (ret <= 0) + { + // tearDown(); + return false; + } + _logger->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion); + for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion)/sizeof(uint32_t); i++) + { + if (serverVersion >= _supportedVersion[i]) + { + _currentVersion = _supportedVersion[i]; + _currentVersionIndex = i; + return initiateResourceNegotiation(); + } + } + ret = -1; + // tearDown(); + return false; + case NEGOTIATED_ABORT: + _logger->log_info("Site2Site Negotiate protocol response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + _logger->log_info("Negotiate protocol response unknown code %d", statusCode); + return true; + } + + return true; +} + +bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() +{ + // Negotiate the version + if (_peerState != HANDSHAKED) + { + _logger->log_error("Site2Site peer state is not handshaked while initiateCodecResourceNegotiation"); + return false; + } + + _logger->log_info("Negotiate Codec version with destination port %s current version %d", _portIdStr.c_str(), _currentCodecVersion); + + int ret = _peer->writeUTF(this->getCodecResourceName()); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + ret = _peer->write(_currentCodecVersion); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + uint8_t statusCode; + ret = _peer->read(statusCode); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + switch (statusCode) + { + case RESOURCE_OK: + _logger->log_info("Site2Site Codec Negotiate version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = _peer->read(serverVersion); + if (ret <= 0) + { + // tearDown(); + return false; + } + _logger->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion); + for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion)/sizeof(uint32_t); i++) + { + if (serverVersion >= _supportedCodecVersion[i]) + { + _currentCodecVersion = _supportedCodecVersion[i]; + _currentCodecVersionIndex = i; + return initiateCodecResourceNegotiation(); + } + } + ret = -1; + // tearDown(); + return false; + case NEGOTIATED_ABORT: + _logger->log_info("Site2Site Codec Negotiate response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + _logger->log_info("Negotiate Codec response unknown code %d", statusCode); + return true; + } + + return true; +} + +bool Site2SiteClientProtocol::handShake() +{ + if (_peerState != ESTABLISHED) + { + _logger->log_error("Site2Site peer state is not established while handshake"); + return false; + } + _logger->log_info("Site2Site Protocol Perform hand shake with destination port %s", _portIdStr.c_str()); + uuid_t uuid; + // Generate the global UUID for the com identify + uuid_generate(uuid); + char uuidStr[37]; + uuid_unparse(uuid, uuidStr); + _commsIdentifier = uuidStr; + + int ret = _peer->writeUTF(_commsIdentifier); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + std::map<std::string, std::string> properties; + properties[HandShakePropertyStr[GZIP]] = "false"; + properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr; + properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(this->_timeOut); + if (this->_currentVersion >= 5) + { + if (this->_batchCount > 0) + properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(this->_batchCount); + if (this->_batchSize > 0) + properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(this->_batchSize); + if (this->_batchDuration > 0) + properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(this->_batchDuration); + } + + if (_currentVersion >= 3) + { + ret = _peer->writeUTF(_peer->getURL()); + if (ret <= 0) + { + // tearDown(); + return false; + } + } + + uint32_t size = properties.size(); + ret = _peer->write(size); + if (ret <= 0) + { + // tearDown(); + return false; + } + + std::map<std::string, std::string>::iterator it; + for (it = properties.begin(); it!= properties.end(); it++) + { + ret = _peer->writeUTF(it->first); + if (ret <= 0) + { + // tearDown(); + return false; + } + ret = _peer->writeUTF(it->second); + if (ret <= 0) + { + // tearDown(); + return false; + } + _logger->log_info("Site2Site Protocol Send handshake properties %s %s", it->first.c_str(), it->second.c_str()); + } + + RespondCode code; + std::string message; + + ret = this->readRespond(code, message); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + switch (code) + { + case PROPERTIES_OK: + _logger->log_info("Site2Site HandShake Completed"); + _peerState = HANDSHAKED; + return true; + case PORT_NOT_IN_VALID_STATE: + case UNKNOWN_PORT: + case PORTS_DESTINATION_FULL: + _logger->log_error("Site2Site HandShake Failed because destination port is either invalid or full"); + ret = -1; + /* + _peer->yield(); + tearDown(); */ + return false; + default: + _logger->log_info("HandShake Failed because of unknown respond code %d", code); + ret = -1; + /* + _peer->yield(); + tearDown(); */ + return false; + } + + return false; +} + +void Site2SiteClientProtocol::tearDown() +{ + if (_peerState >= ESTABLISHED) + { + _logger->log_info("Site2Site Protocol tearDown"); + // need to write shutdown request + writeRequestType(SHUTDOWN); + } + + std::map<std::string, Transaction *>::iterator it; + for (it = _transactionMap.begin(); it!= _transactionMap.end(); it++) + { + delete it->second; + } + _transactionMap.clear(); + _peer->Close(); + _peerState = IDLE; +} + +int Site2SiteClientProtocol::writeRequestType(RequestType type) +{ + if (type >= MAX_REQUEST_TYPE) + return -1; + + return _peer->writeUTF(RequestTypeStr[type]); +} + +int Site2SiteClientProtocol::readRequestType(RequestType &type) +{ + std::string requestTypeStr; + + int ret = _peer->readUTF(requestTypeStr); + + if (ret <= 0) + return ret; + + for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++) + { + if (RequestTypeStr[i] == requestTypeStr) + { + type = (RequestType) i; + return ret; + } + } + + return -1; +} + +int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string &message) +{ + uint8_t firstByte; + + int ret = _peer->read(firstByte); + + if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1) + return -1; + + uint8_t secondByte; + + ret = _peer->read(secondByte); + + if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2) + return -1; + + uint8_t thirdByte; + + ret = _peer->read(thirdByte); + + if (ret <= 0) + return ret; + + code = (RespondCode) thirdByte; + + RespondCodeContext *resCode = this->getRespondCodeContext(code); + + if ( resCode == NULL) + { + // Not a valid respond code + return -1; + } + if (resCode->hasDescription) + { + ret = _peer->readUTF(message); + if (ret <= 0) + return -1; + } + return 3 + message.size(); +} + +int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string message) +{ + RespondCodeContext *resCode = this->getRespondCodeContext(code); + + if (resCode == NULL) + { + // Not a valid respond code + return -1; + } + + uint8_t codeSeq[3]; + codeSeq[0] = CODE_SEQUENCE_VALUE_1; + codeSeq[1] = CODE_SEQUENCE_VALUE_2; + codeSeq[2] = (uint8_t) code; + + int ret = _peer->write(codeSeq, 3); + + if (ret != 3) + return -1; + + if (resCode->hasDescription) + { + ret = _peer->writeUTF(message); + if (ret > 0) + return (3 + ret); + else + return ret; + } + else + return 3; +} + +bool Site2SiteClientProtocol::negotiateCodec() +{ + if (_peerState != HANDSHAKED) + { + _logger->log_error("Site2Site peer state is not handshaked while negotiate codec"); + return false; + } + + _logger->log_info("Site2Site Protocol Negotiate Codec with destination port %s", _portIdStr.c_str()); + + int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC); + + if (status <= 0) + { + // tearDown(); + return false; + } + + // Negotiate the codec version + bool ret = initiateCodecResourceNegotiation(); + + if (!ret) + { + _logger->log_error("Site2Site Codec Version Negotiation failed"); + /* + _peer->yield(); + tearDown(); */ + return false; + } + + _logger->log_info("Site2Site Codec Completed and move to READY state for data transfer"); + _peerState = READY; + + return true; +} + +bool Site2SiteClientProtocol::bootstrap() +{ + if (_peerState == READY) + return true; + + tearDown(); + + if (establish() && handShake() && negotiateCodec()) + { + _logger->log_info("Site2Site Ready For data transaction"); + return true; + } + else + { + _peer->yield(); + tearDown(); + return false; + } +} + +Transaction* Site2SiteClientProtocol::createTransaction(std::string &transactionID, TransferDirection direction) +{ + int ret; + bool dataAvailable; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + return NULL; + } + + if (direction == RECEIVE) + { + ret = writeRequestType(RECEIVE_FLOWFILES); + + if (ret <= 0) + { + // tearDown(); + return NULL; + } + + RespondCode code; + std::string message; + + ret = readRespond(code, message); + + if (ret <= 0) + { + // tearDown(); + return NULL; + } + + switch (code) + { + case MORE_DATA: + dataAvailable = true; + _logger->log_info("Site2Site peer indicates that data is available"); + transaction = new Transaction(direction); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); + return transaction; + case NO_MORE_DATA: + dataAvailable = false; + _logger->log_info("Site2Site peer indicates that no data is available"); + transaction = new Transaction(direction); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); + return transaction; + default: + _logger->log_info("Site2Site got unexpected response %d when asking for data", code); + // tearDown(); + return NULL; + } + } + else + { + ret = writeRequestType(SEND_FLOWFILES); + + if (ret <= 0) + { + // tearDown(); + return NULL; + } + else + { + transaction = new Transaction(direction); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); + return transaction; + } + } +} + +bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket *packet, bool &eof) +{ + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + return false; + } + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return false; + } + else + { + transaction = it->second; + } + + if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) + { + _logger->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); + return false; + } + + if (transaction->getDirection() != RECEIVE) + { + _logger->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); + return false; + } + + if (!transaction->isDataAvailable()) + { + eof = true; + return true; + } + + if (transaction->_transfers > 0) + { + // if we already has transfer before, check to see whether another one is available + RespondCode code; + std::string message; + + ret = readRespond(code, message); + + if (ret <= 0) + { + return false; + } + if (code == CONTINUE_TRANSACTION) + { + _logger->log_info("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str()); + transaction->_dataAvailable = true; + } + else if (code == FINISH_TRANSACTION) + { + _logger->log_info("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str()); + transaction->_dataAvailable = false; + } + else + { + _logger->log_info("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code); + return false; + } + } + + if (!transaction->isDataAvailable()) + { + eof = true; + return true; + } + + // start to read the packet + uint32_t numAttributes; + ret = _peer->read(numAttributes, &transaction->_crc); + if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) + { + return false; + } + + // read the attributes + for (unsigned int i = 0; i < numAttributes; i++) + { + std::string key; + std::string value; + ret = _peer->readUTF(key, true, &transaction->_crc); + if (ret <= 0) + { + return false; + } + ret = _peer->readUTF(value, true, &transaction->_crc); + if (ret <= 0) + { + return false; + } + packet->_attributes[key] = value; + _logger->log_info("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str()); + } + + uint64_t len; + ret = _peer->read(len, &transaction->_crc); + if (ret <= 0) + { + return false; + } + + packet->_size = len; + transaction->_transfers++; + transaction->_state = DATA_EXCHANGED; + transaction->_bytes += len; + _logger->log_info("Site2Site transaction %s receives flow record %d, total length %d", transactionID.c_str(), + transaction->_transfers, transaction->_bytes); + + return true; +} + +bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, FlowFileRecord *flowFile, ProcessSession *session) +{ + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + return false; + } + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return false; + } + else + { + transaction = it->second; + } + + if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) + { + _logger->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); + return false; + } + + if (transaction->getDirection() != SEND) + { + _logger->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); + return false; + } + + if (transaction->_transfers > 0) + { + ret = writeRespond(CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION"); + if (ret <= 0) + { + return false; + } + } + + // start to read the packet + uint32_t numAttributes = packet->_attributes.size(); + ret = _peer->write(numAttributes, &transaction->_crc); + if (ret != 4) + { + return false; + } + + std::map<std::string, std::string>::iterator itAttribute; + for (itAttribute = packet->_attributes.begin(); itAttribute!= packet->_attributes.end(); itAttribute++) + { + ret = _peer->writeUTF(itAttribute->first, true, &transaction->_crc); + if (ret <= 0) + { + return false; + } + ret = _peer->writeUTF(itAttribute->second, true, &transaction->_crc); + if (ret <= 0) + { + return false; + } + _logger->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), + itAttribute->first.c_str(), itAttribute->second.c_str()); + } + + uint64_t len = flowFile->getSize() ; + ret = _peer->write(len, &transaction->_crc); + if (ret != 8) + { + return false; + } + + if (flowFile->getSize()) + { + Site2SiteClientProtocol::ReadCallback callback(packet); + session->read(flowFile, &callback); + if (flowFile->getSize() != packet->_size) + { + return false; + } + } + + transaction->_transfers++; + transaction->_state = DATA_EXCHANGED; + transaction->_bytes += len; + _logger->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(), + transaction->_transfers, transaction->_bytes); + + return true; +} + +void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, ProcessSession *session) +{ + uint64_t bytes = 0; + int transfers = 0; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); + return; + } + + // Create the transaction + std::string transactionID; + transaction = createTransaction(transactionID, RECEIVE); + + if (transaction == NULL) + { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); + return; + } + + try + { + while (true) + { + std::map<std::string, std::string> empty; + DataPacket packet(this, transaction, empty); + bool eof = false; + + if (!receive(transactionID, &packet, eof)) + { + throw Exception(SITE2SITE_EXCEPTION, "Receive Failed"); + return; + } + if (eof) + { + // transaction done + break; + } + FlowFileRecord *flowFile = session->create(); + if (!flowFile) + { + throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed"); + return; + } + std::map<std::string, std::string>::iterator it; + for (it = packet._attributes.begin(); it!= packet._attributes.end(); it++) + { + flowFile->addAttribute(it->first, it->second); + } + + if (packet._size > 0) + { + Site2SiteClientProtocol::WriteCallback callback(&packet); + session->write(flowFile, &callback); + if (flowFile->getSize() != packet._size) + { + throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right"); + return; + } + } + Relationship relation; // undefined relationship + session->transfer(flowFile, relation); + // receive the transfer for the flow record + bytes += packet._size; + transfers++; + } // while true + + if (!confirm(transactionID)) + { + throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed"); + return; + } + if (!complete(transactionID)) + { + throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed"); + return; + } + _logger->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", + transactionID.c_str(), transfers, bytes); + // we yield the receive if we did not get anything + if (transfers == 0) + context->yield(); + } + catch (std::exception &exception) + { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + _logger->log_debug("Caught Exception during Site2SiteClientProtocol::receiveFlowFiles"); + throw; + } + + deleteTransaction(transactionID); + + return; +} + +bool Site2SiteClientProtocol::confirm(std::string transactionID) +{ + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + return false; + } + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return false; + } + else + { + transaction = it->second; + } + + if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() && + transaction->getDirection() == RECEIVE) + { + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } + + if (transaction->getState() != DATA_EXCHANGED) + return false; + + if (transaction->getDirection() == RECEIVE) + { + if (transaction->isDataAvailable()) + return false; + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + long crcValue = transaction->getCRC(); + std::string crc = std::to_string(crcValue); + _logger->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), + transactionID.c_str()); + ret = writeRespond(CONFIRM_TRANSACTION, crc); + if (ret <= 0) + return false; + RespondCode code; + std::string message; + readRespond(code, message); + if (ret <= 0) + return false; + + if (code == CONFIRM_TRANSACTION) + { + _logger->log_info("Site2Site transaction %s peer confirm transaction", transactionID.c_str()); + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } + else if (code == BAD_CHECKSUM) + { + _logger->log_info("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str()); + /* + transaction->_state = TRANSACTION_CONFIRMED; + return true; */ + return false; + } + else + { + _logger->log_info("Site2Site transaction %s peer unknown respond code %d", + transactionID.c_str(), code); + return false; + } + } + else + { + _logger->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", + transactionID.c_str()); + ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION"); + if (ret <= 0) + return false; + RespondCode code; + std::string message; + readRespond(code, message); + if (ret <= 0) + return false; + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + if (code == CONFIRM_TRANSACTION) + { + _logger->log_info("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str()); + if (this->_currentVersion > 3) + { + long crcValue = transaction->getCRC(); + std::string crc = std::to_string(crcValue); + if (message == crc) + { + _logger->log_info("Site2Site transaction %s CRC matched", transactionID.c_str()); + ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); + if (ret <= 0) + return false; + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } + else + { + _logger->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str()); + ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM"); + /* + ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); + if (ret <= 0) + return false; + transaction->_state = TRANSACTION_CONFIRMED; + return true; */ + return false; + } + } + ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); + if (ret <= 0) + return false; + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } + else + { + _logger->log_info("Site2Site transaction %s peer unknown respond code %d", + transactionID.c_str(), code); + return false; + } + return false; + } +} + +void Site2SiteClientProtocol::cancel(std::string transactionID) +{ + Transaction *transaction = NULL; + + if (_peerState != READY) + { + return; + } + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return; + } + else + { + transaction = it->second; + } + + if (transaction->getState() == TRANSACTION_CANCELED || transaction->getState() == TRANSACTION_COMPLETED + || transaction->getState() == TRANSACTION_ERROR) + { + return; + } + + this->writeRespond(CANCEL_TRANSACTION, "Cancel"); + transaction->_state = TRANSACTION_CANCELED; + + tearDown(); + return; +} + +void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) +{ + Transaction *transaction = NULL; + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return; + } + else + { + transaction = it->second; + } + + _logger->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str()); + delete transaction; + _transactionMap.erase(transactionID); +} + +void Site2SiteClientProtocol::error(std::string transactionID) +{ + Transaction *transaction = NULL; + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return; + } + else + { + transaction = it->second; + } + + transaction->_state = TRANSACTION_ERROR; + tearDown(); + return; +} + +//! Complete the transaction +bool Site2SiteClientProtocol::complete(std::string transactionID) +{ + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + return false; + } + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return false; + } + else + { + transaction = it->second; + } + + if (transaction->getState() != TRANSACTION_CONFIRMED) + { + return false; + } + + if (transaction->getDirection() == RECEIVE) + { + if (transaction->_transfers == 0) + { + transaction->_state = TRANSACTION_COMPLETED; + return true; + } + else + { + _logger->log_info("Site2Site transaction %s send finished", transactionID.c_str()); + ret = this->writeRespond(TRANSACTION_FINISHED, "Finished"); + if (ret <= 0) + return false; + else + { + transaction->_state = TRANSACTION_COMPLETED; + return true; + } + } + } + else + { + RespondCode code; + std::string message; + int ret; + + ret = readRespond(code, message); + + if (ret <= 0) + return false; + + if (code == TRANSACTION_FINISHED) + { + _logger->log_info("Site2Site transaction %s peer finished transaction", transactionID.c_str()); + transaction->_state = TRANSACTION_COMPLETED; + return true; + } + else + { + _logger->log_info("Site2Site transaction %s peer unknown respond code %d", + transactionID.c_str(), code); + return false; + } + } +} + +void Site2SiteClientProtocol::transferFlowFiles(ProcessContext *context, ProcessSession *session) +{ + FlowFileRecord *flow = session->get(); + Transaction *transaction = NULL; + + if (!flow) + return; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); + return; + } + + // Create the transaction + std::string transactionID; + transaction = createTransaction(transactionID, SEND); + + if (transaction == NULL) + { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); + return; + } + + bool continueTransaction = true; + uint64_t startSendingNanos = getTimeNano(); + + try + { + while (continueTransaction) + { + DataPacket packet(this, transaction, flow->getAttributes()); + + if (!send(transactionID, &packet, flow, session)) + { + throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); + return; + } + _logger->log_info("Site2Site transaction %s send flow record %s", + transactionID.c_str(), flow->getUUIDStr().c_str()); + session->remove(flow); + + uint64_t transferNanos = getTimeNano() - startSendingNanos; + if (transferNanos > _batchSendNanos) + break; + + flow = session->get(); + if (!flow) + { + continueTransaction = false; + } + } // while true + + if (!confirm(transactionID)) + { + throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); + return; + } + if (!complete(transactionID)) + { + throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); + return; + } + _logger->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", + transactionID.c_str(), transaction->_transfers, transaction->_bytes); + } + catch (std::exception &exception) + { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + _logger->log_debug("Caught Exception during Site2SiteClientProtocol::transferFlowFiles"); + throw; + } + + deleteTransaction(transactionID); + + return; +}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/Site2SitePeer.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp new file mode 100644 index 0000000..48e19d0 --- /dev/null +++ b/libminifi/src/Site2SitePeer.cpp @@ -0,0 +1,435 @@ +/** + * @file Site2SitePeer.cpp + * Site2SitePeer class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <sys/time.h> +#include <stdio.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <random> +#include <netinet/tcp.h> +#include <iostream> +#include "Site2SitePeer.h" + +//! CRC tables +std::atomic<bool> CRC32::tableInit(false); +unsigned int CRC32::table[256]; + +bool Site2SitePeer::Open() +{ + in_addr_t addr; + int sock = 0; + struct hostent *h; + const char *host; + uint16_t port; + + host = this->_host.c_str(); + port = this->_port; + + if (strlen(host) == 0) + return false; + +#ifdef __MACH__ + h = gethostbyname(host); +#else + char buf[1024]; + struct hostent he; + int hh_errno; + gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); +#endif + memcpy((char *) &addr, h->h_addr_list[0], h->h_length); + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) + { + _logger->log_error("Could not create socket to hostName %s", host); + this->yield(); + return false; + } + +#ifndef __MACH__ + int opt = 1; + bool nagle_off = true; + + if (nagle_off) + { + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) + { + _logger->log_error("setsockopt() TCP_NODELAY failed"); + close(sock); + this->yield(); + return false; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&opt, sizeof(opt)) < 0) + { + _logger->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + this->yield(); + return false; + } + } + + int sndsize = 256*1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) + { + _logger->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + this->yield(); + return false; + } + int rcvsize = 256*1024; + if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvsize, (int)sizeof(rcvsize)) < 0) + { + _logger->log_error("setsockopt() SO_RCVBUF failed"); + close(sock); + this->yield(); + return false; + } +#endif + + struct sockaddr_in sa; + socklen_t socklen; + int status; + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = htonl(INADDR_ANY); + sa.sin_port = htons(0); + socklen = sizeof(sa); + if (bind(sock, (struct sockaddr *)&sa, socklen) < 0) + { + _logger->log_error("socket bind failed"); + close(sock); + this->yield(); + return false; + } + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = addr; + sa.sin_port = htons(port); + socklen = sizeof(sa); + + status = connect(sock, (struct sockaddr *)&sa, socklen); + + if (status < 0) + { + _logger->log_error("socket connect failed to %s %d", host, port); + close(sock); + this->yield(); + return false; + } + + _logger->log_info("Site2Site Peer socket %d connect to server %s port %d success", sock, host, port); + + _socket = sock; + + status = sendData((uint8_t *) MAGIC_BYTES, sizeof(MAGIC_BYTES)); + + if (status <= 0) + { + Close(); + return false; + } + + return true; +} + +void Site2SitePeer::Close() +{ + if (_socket) + { + _logger->log_info("Site2Site Peer socket %d close", _socket); + close(_socket); + _socket = 0; + } +} + +int Site2SitePeer::sendData(uint8_t *buf, int buflen, CRC32 *crc) +{ + int ret = 0, bytes = 0; + + if (_socket <= 0) + { + // this->yield(); + return -1; + } + + while (bytes < buflen) + { + ret = send(_socket, buf+bytes, buflen-bytes, 0); + //check for errors + if (ret == -1) + { + _logger->log_error("Site2Site Peer socket %d send failed %s", _socket, strerror(errno)); + Close(); + // this->yield(); + return ret; + } + bytes+=ret; + } + + if (crc) + crc->update(buf, buflen); + + return bytes; +} + +int Site2SitePeer::Select(int msec) +{ + fd_set fds; + struct timeval tv; + int retval; + int fd = _socket; + + FD_ZERO(&fds); + FD_SET(fd, &fds); + + tv.tv_sec = msec/1000; + tv.tv_usec = (msec % 1000) * 1000; + + if (msec > 0) + retval = select(fd+1, &fds, NULL, NULL, &tv); + else + retval = select(fd+1, &fds, NULL, NULL, NULL); + + if (retval <= 0) + return retval; + if (FD_ISSET(fd, &fds)) + return retval; + else + return 0; +} + +int Site2SitePeer::readData(uint8_t *buf, int buflen, CRC32 *crc) +{ + int sendSize = buflen; + uint8_t *start = buf; + + if (_socket <= 0) + { + // this->yield(); + return -1; + } + + while (buflen) + { + int status; + status = Select((int) _timeOut); + if (status <= 0) + { + Close(); + return status; + } + status = recv(_socket, buf, buflen, 0); + if (status <= 0) + { + Close(); + // this->yield(); + return status; + } + buflen -= status; + buf += status; + } + + if (crc) + crc->update(start, sendSize); + + return sendSize; +} + +int Site2SitePeer::writeUTF(std::string str, bool widen, CRC32 *crc) +{ + int strlen = str.length(); + int utflen = 0; + int c, count = 0; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = str.at(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + if (utflen > 65535) + return -1; + + uint8_t *bytearr = NULL; + if (!widen) + { + bytearr = new uint8_t[utflen+2]; + bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF); + bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF); + } + else + { + bytearr = new uint8_t[utflen+4]; + bytearr[count++] = (uint8_t) ((utflen >> 24) & 0xFF); + bytearr[count++] = (uint8_t) ((utflen >> 16) & 0xFF); + bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF); + bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF); + } + + int i=0; + for (i=0; i<strlen; i++) { + c = str.at(i); + if (!((c >= 0x0001) && (c <= 0x007F))) break; + bytearr[count++] = (uint8_t) c; + } + + for (;i < strlen; i++){ + c = str.at(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + bytearr[count++] = (uint8_t) c; + } else if (c > 0x07FF) { + bytearr[count++] = (uint8_t) (0xE0 | ((c >> 12) & 0x0F)); + bytearr[count++] = (uint8_t) (0x80 | ((c >> 6) & 0x3F)); + bytearr[count++] = (uint8_t) (0x80 | ((c >> 0) & 0x3F)); + } else { + bytearr[count++] = (uint8_t) (0xC0 | ((c >> 6) & 0x1F)); + bytearr[count++] = (uint8_t) (0x80 | ((c >> 0) & 0x3F)); + } + } + int ret; + if (!widen) + { + ret = sendData(bytearr, utflen+2, crc); + } + else + { + ret = sendData(bytearr, utflen+4, crc); + } + delete[] bytearr; + return ret; +} + +int Site2SitePeer::readUTF(std::string &str, bool widen, CRC32 *crc) +{ + uint16_t utflen; + int ret; + + if (!widen) + { + ret = read(utflen, crc); + if (ret <= 0) + return ret; + } + else + { + uint32_t len; + ret = read(len, crc); + if (ret <= 0) + return ret; + utflen = len; + } + + uint8_t *bytearr = NULL; + char *chararr = NULL; + bytearr = new uint8_t[utflen]; + chararr = new char[utflen]; + memset(chararr, 0, utflen); + + int c, char2, char3; + int count = 0; + int chararr_count=0; + + ret = read(bytearr, utflen, crc); + if (ret <= 0) + { + delete[] bytearr; + delete[] chararr; + return ret; + } + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + if (c > 127) break; + count++; + chararr[chararr_count++]=(char)c; + } + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + switch (c >> 4) { + case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7: + /* 0xxxxxxx*/ + count++; + chararr[chararr_count++]=(char)c; + break; + case 12: case 13: + /* 110x xxxx 10xx xxxx*/ + count += 2; + if (count > utflen) + { + delete[] bytearr; + delete[] chararr; + return -1; + } + char2 = (int) bytearr[count-1]; + if ((char2 & 0xC0) != 0x80) + { + delete[] bytearr; + delete[] chararr; + return -1; + } + chararr[chararr_count++]=(char)(((c & 0x1F) << 6) | + (char2 & 0x3F)); + break; + case 14: + /* 1110 xxxx 10xx xxxx 10xx xxxx */ + count += 3; + if (count > utflen) + { + delete[] bytearr; + delete[] chararr; + return -1; + } + char2 = (int) bytearr[count-2]; + char3 = (int) bytearr[count-1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) + { + delete[] bytearr; + delete[] chararr; + return -1; + } + chararr[chararr_count++]=(char)(((c & 0x0F) << 12) | + ((char2 & 0x3F) << 6) | + ((char3 & 0x3F) << 0)); + break; + default: + delete[] bytearr; + delete[] chararr; + return -1; + } + } + // The number of chars produced may be less than utflen + std::string value(chararr, chararr_count); + str = value; + delete[] bytearr; + delete[] chararr; + if (!widen) + return (2 + utflen); + else + return (4 + utflen); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/TailFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/TailFile.cpp b/libminifi/src/TailFile.cpp new file mode 100644 index 0000000..445255b --- /dev/null +++ b/libminifi/src/TailFile.cpp @@ -0,0 +1,272 @@ +/** + * @file TailFile.cpp + * TailFile class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <time.h> +#include <sstream> +#include <stdio.h> +#include <string> +#include <iostream> +#include <dirent.h> +#include <limits.h> +#include <unistd.h> + +#include "TimeUtil.h" +#include "TailFile.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +const std::string TailFile::ProcessorName("TailFile"); +Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", ""); +Property TailFile::StateFile("State File", + "Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off", ""); +Relationship TailFile::Success("success", "All files are routed to success"); + +void TailFile::initialize() +{ + //! Set the supported properties + std::set<Property> properties; + properties.insert(FileName); + properties.insert(StateFile); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +std::string TailFile::trimLeft(const std::string& s) +{ + const char *WHITESPACE = " \n\r\t"; + size_t startpos = s.find_first_not_of(WHITESPACE); + return (startpos == std::string::npos) ? "" : s.substr(startpos); +} + +std::string TailFile::trimRight(const std::string& s) +{ + const char *WHITESPACE = " \n\r\t"; + size_t endpos = s.find_last_not_of(WHITESPACE); + return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1); +} + +void TailFile::parseStateFileLine(char *buf) +{ + char *line = buf; + + while ((line[0] == ' ') || (line[0] =='\t')) + ++line; + + char first = line[0]; + if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) + { + return; + } + + char *equal = strchr(line, '='); + if (equal == NULL) + { + return; + } + + equal[0] = '\0'; + std::string key = line; + + equal++; + while ((equal[0] == ' ') || (equal[0] == '\t')) + ++equal; + + first = equal[0]; + if ((first == '\0') || (first == '\r') || (first== '\n')) + { + return; + } + + std::string value = equal; + key = trimRight(key); + value = trimRight(value); + + if (key == "FILENAME") + this->_currentTailFileName = value; + if (key == "POSITION") + this->_currentTailFilePosition = std::stoi(value); + + return; +} + +void TailFile::recoverState() +{ + std::ifstream file(_stateFile.c_str(), std::ifstream::in); + if (!file.good()) + { + _logger->log_error("load state file failed %s", _stateFile.c_str()); + return; + } + const unsigned int bufSize = 512; + char buf[bufSize]; + for (file.getline(buf,bufSize); file.good(); file.getline(buf,bufSize)) + { + parseStateFileLine(buf); + } +} + +void TailFile::storeState() +{ + std::ofstream file(_stateFile.c_str()); + if (!file.is_open()) + { + _logger->log_error("store state file failed %s", _stateFile.c_str()); + return; + } + file << "FILENAME=" << this->_currentTailFileName << "\n"; + file << "POSITION=" << this->_currentTailFilePosition << "\n"; + file.close(); +} + +static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) +{ + return (i.modifiedTime < j.modifiedTime); +} +void TailFile::checkRollOver() +{ + struct stat statbuf; + std::vector<TailMatchedFileItem> matchedFiles; + std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; + + if (stat(fullPath.c_str(), &statbuf) == 0) + { + if (statbuf.st_size > this->_currentTailFilePosition) + // there are new input for the current tail file + return; + + uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000); + std::string pattern = _fileName; + std::size_t found = _fileName.find_last_of("."); + if (found != std::string::npos) + pattern = _fileName.substr(0,found); + DIR *d; + d = opendir(this->_fileLocation.c_str()); + if (!d) + return; + while (1) + { + struct dirent *entry; + entry = readdir(d); + if (!entry) + break; + std::string d_name = entry->d_name; + if (!(entry->d_type & DT_DIR)) + { + std::string fileName = d_name; + std::string fileFullName = this->_fileLocation + "/" + d_name; + if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) + { + if (((uint64_t) (statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile) + { + TailMatchedFileItem item; + item.fileName = fileName; + item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); + matchedFiles.push_back(item); + } + } + } + } + closedir(d); + + // Sort the list based on modified time + std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem); + for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); it!=matchedFiles.end(); ++it) + { + TailMatchedFileItem item = *it; + if (item.fileName == _currentTailFileName) + { + ++it; + if (it!=matchedFiles.end()) + { + TailMatchedFileItem nextItem = *it; + _logger->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str()); + _currentTailFileName = nextItem.fileName; + _currentTailFilePosition = 0; + storeState(); + } + break; + } + } + } + else + return; +} + + +void TailFile::onTrigger(ProcessContext *context, ProcessSession *session) +{ + std::string value; + if (context->getProperty(FileName.getName(), value)) + { + std::size_t found = value.find_last_of("/\\"); + this->_fileLocation = value.substr(0,found); + this->_fileName = value.substr(found+1); + } + if (context->getProperty(StateFile.getName(), value)) + { + _stateFile = value; + } + if (!this->_stateRecovered) + { + _stateRecovered = true; + this->_currentTailFileName = _fileName; + this->_currentTailFilePosition = 0; + // recover the state if we have not done so + this->recoverState(); + } + checkRollOver(); + std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; + struct stat statbuf; + if (stat(fullPath.c_str(), &statbuf) == 0) + { + if (statbuf.st_size <= this->_currentTailFilePosition) + // there are no new input for the current tail file + { + context->yield(); + return; + } + FlowFileRecord *flowFile = session->create(); + if (!flowFile) + return; + std::size_t found = _currentTailFileName.find_last_of("."); + std::string baseName = _currentTailFileName.substr(0,found); + std::string extension = _currentTailFileName.substr(found+1); + flowFile->updateAttribute(PATH, _fileLocation); + flowFile->addAttribute(ABSOLUTE_PATH, fullPath); + session->import(fullPath, flowFile, true, this->_currentTailFilePosition); + session->transfer(flowFile, Success); + _logger->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), flowFile->getSize()); + std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension; + flowFile->updateAttribute(FILENAME, logName); + this->_currentTailFilePosition += flowFile->getSize(); + storeState(); + } +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/src/TimerDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp new file mode 100644 index 0000000..3ce57ae --- /dev/null +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -0,0 +1,134 @@ +/** + * @file TimerDrivenSchedulingAgent.cpp + * TimerDrivenSchedulingAgent class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <chrono> +#include <thread> +#include <iostream> +#include "Property.h" +#include "TimerDrivenSchedulingAgent.h" + +void TimerDrivenSchedulingAgent::schedule(Processor *processor) +{ + std::lock_guard<std::mutex> lock(_mtx); + + _administrativeYieldDuration = 0; + std::string yieldValue; + + if (_configure->get(Configure::nifi_administrative_yield_duration, yieldValue)) + { + TimeUnit unit; + if (Property::StringToTime(yieldValue, _administrativeYieldDuration, unit) && + Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, _administrativeYieldDuration)) + { + _logger->log_debug("nifi_administrative_yield_duration: [%d] ms", _administrativeYieldDuration); + } + } + + _boredYieldDuration = 0; + if (_configure->get(Configure::nifi_bored_yield_duration, yieldValue)) + { + TimeUnit unit; + if (Property::StringToTime(yieldValue, _boredYieldDuration, unit) && + Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration)) + { + _logger->log_debug("nifi_bored_yield_duration: [%d] ms", _boredYieldDuration); + } + } + + if (processor->getScheduledState() != RUNNING) + { + _logger->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str()); + return; + } + + std::map<std::string, std::vector<std::thread *>>::iterator it = + _threads.find(processor->getUUIDStr()); + if (it != _threads.end()) + { + _logger->log_info("Can not schedule threads for processor %s because there are existed thread running"); + return; + } + + std::vector<std::thread *> threads; + for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) + { + std::thread *thread = new std::thread(run, this, processor); + thread->detach(); + threads.push_back(thread); + _logger->log_info("Scheduled Time Driven thread %d running for process %s", thread->get_id(), + processor->getName().c_str()); + } + _threads[processor->getUUIDStr().c_str()] = threads; + + return; +} + +void TimerDrivenSchedulingAgent::unschedule(Processor *processor) +{ + std::lock_guard<std::mutex> lock(_mtx); + + if (processor->getScheduledState() != RUNNING) + { + _logger->log_info("Can not unschedule threads for processor %s because it is not running", processor->getName().c_str()); + return; + } + + std::map<std::string, std::vector<std::thread *>>::iterator it = + _threads.find(processor->getUUIDStr()); + + if (it == _threads.end()) + { + _logger->log_info("Can not unschedule threads for processor %s because there are no existed thread running"); + return; + } + for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread) + { + std::thread *thread = *itThread; + _logger->log_info("Scheduled Time Driven thread %d deleted for process %s", thread->get_id(), + processor->getName().c_str()); + delete thread; + } + _threads.erase(processor->getUUIDStr()); + processor->clearActiveTask(); + + return; +} + +void TimerDrivenSchedulingAgent::run(TimerDrivenSchedulingAgent *agent, Processor *processor) +{ + while (agent->_running) + { + bool shouldYield = agent->onTrigger(processor); + + if (processor->isYield()) + { + // Honor the yield + std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime())); + } + else if (shouldYield && agent->_boredYieldDuration > 0) + { + // No work to do or need to apply back pressure + std::this_thread::sleep_for(std::chrono::milliseconds(agent->_boredYieldDuration)); + } + std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano())); + } + return; +} + + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/test/FlowFileRecordTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/FlowFileRecordTest.cpp b/libminifi/test/FlowFileRecordTest.cpp new file mode 100644 index 0000000..09a3d33 --- /dev/null +++ b/libminifi/test/FlowFileRecordTest.cpp @@ -0,0 +1,28 @@ +/** + * @file MiNiFiMain.cpp + * MiNiFiMain implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> + +#include "FlowFileRecord.h" + +int main(int argc, char **argv) +{ +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/libminifi/test/Server.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/Server.cpp b/libminifi/test/Server.cpp new file mode 100644 index 0000000..e7b3452 --- /dev/null +++ b/libminifi/test/Server.cpp @@ -0,0 +1,607 @@ +/* A simple server in the internet domain using TCP + The port number is passed as an argument */ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <errno.h> +#include <arpa/inet.h> +#include <fcntl.h> +#include <netdb.h> +#include <string> +#include <errno.h> +#include <chrono> +#include <thread> +#include <iostream> // std::cout +#include <fstream> // std::ifstream +#include <signal.h> + +#define DEFAULT_NIFI_SERVER_PORT 9000 +#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec +#define MAX_READ_TIMEOUT 30000 // 30 seconds + +//! FlowControl Protocol Msg Type +typedef enum { + REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version + REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval + REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info + REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property + MAX_FLOW_CONTROL_MSG_TYPE +} FlowControlMsgType; + +//! FlowControl Protocol Msg Type String +static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = +{ + "REGISTER_REQ", + "REGISTER_RESP", + "REPORT_REQ", + "REPORT_RESP" +}; + +//! Flow Control Msg Type to String +inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) +{ + if (type < MAX_FLOW_CONTROL_MSG_TYPE) + return FlowControlMsgTypeStr[type]; + else + return NULL; +} + +//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV) +typedef enum { + //Fix length 8 bytes: client to server in register request, required field + FLOW_SERIAL_NUMBER, + // Flow XML name TLV: client to server in register request and report request, required field + FLOW_XML_NAME, + // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server + FLOW_XML_CONTENT, + // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field + REPORT_INTERVAL, + // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROCESSOR_NAME, + // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROPERTY_NAME, + // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROPERTY_VALUE, + // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server + REPORT_BLOB, + MAX_FLOW_MSG_ID +} FlowControlMsgID; + +//! FlowControl Protocol Msg ID String +static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = +{ + "FLOW_SERIAL_NUMBER", + "FLOW_XML_NAME", + "FLOW_XML_CONTENT", + "REPORT_INTERVAL", + "PROCESSOR_NAME" + "PROPERTY_NAME", + "PROPERTY_VALUE", + "REPORT_BLOB" +}; + +#define TYPE_HDR_LEN 4 // Fix Hdr Type +#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes + +//! FlowControl Protocol Msg Len +inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) +{ + if (id == FLOW_SERIAL_NUMBER) + return (TYPE_HDR_LEN + 8); + else if (id == REPORT_INTERVAL) + return (TYPE_HDR_LEN + 4); + else if (id < MAX_FLOW_MSG_ID) + return (TLV_HDR_LEN + payLoadLen); + else + return -1; +} + +//! Flow Control Msg Id to String +inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) +{ + if (id < MAX_FLOW_MSG_ID) + return FlowControlMsgIDStr[id]; + else + return NULL; +} + +//! Flow Control Respond status code +typedef enum { + RESP_SUCCESS, + RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register + RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller + RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller + RESP_FAILURE, + MAX_RESP_CODE +} FlowControlRespCode; + +//! FlowControl Resp Code str +static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = +{ + "RESP_SUCCESS", + "RESP_TRIGGER_REGISTER", + "RESP_START_FLOW_CONTROLLER", + "RESP_STOP_FLOW_CONTROLLER", + "RESP_FAILURE" +}; + +//! Flow Control Resp Code to String +inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) +{ + if (code < MAX_RESP_CODE) + return FlowControlRespCodeStr[code]; + else + return NULL; +} + +//! Common FlowControlProtocol Header +typedef struct { + uint32_t msgType; //! Msg Type + uint32_t seqNumber; //! Seq Number to match Req with Resp + uint32_t status; //! Resp Code, see FlowControlRespCode + uint32_t payloadLen; //! Msg Payload length +} FlowControlProtocolHeader; + + +//! encode uint32_t +uint8_t *encode(uint8_t *buf, uint32_t value) +{ + *buf++ = (value & 0xFF000000) >> 24; + *buf++ = (value & 0x00FF0000) >> 16; + *buf++ = (value & 0x0000FF00) >> 8; + *buf++ = (value & 0x000000FF); + return buf; +} + +//! encode uint32_t +uint8_t *decode(uint8_t *buf, uint32_t &value) +{ + value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3])); + return (buf + 4); +} + +//! encode byte array +uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) +{ + memcpy(buf, bufArray, size); + buf += size; + return buf; +} + +//! encode std::string +uint8_t *encode(uint8_t *buf, std::string value) +{ + // add the \0 for size + buf = encode(buf, value.size()+1); + buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1); + return buf; +} + +int sendData(int socket, uint8_t *buf, int buflen) +{ + int ret = 0, bytes = 0; + + while (bytes < buflen) + { + ret = send(socket, buf+bytes, buflen-bytes, 0); + //check for errors + if (ret == -1) + { + return ret; + } + bytes+=ret; + } + + return bytes; +} + +void error(const char *msg) +{ + perror(msg); + exit(1); +} + +/* readline - read a '\n' terminated line from socket fd + into buffer bufptr of size len. The line in the + buffer is terminated with '\0'. + It returns -1 in case of error or if + the capacity of the buffer is exceeded. + It returns 0 if EOF is encountered before reading '\n'. + */ +int readline( int fd, char *bufptr, size_t len ) +{ + /* Note that this function is very tricky. It uses the + static variables bp, cnt, and b to establish a local buffer. + The recv call requests large chunks of data (the size of the buffer). + Then if the recv call reads more than one line, the overflow + remains in the buffer and it is made available to the next call + to readline. + Notice also that this routine reads up to '\n' and overwrites + it with '\0'. Thus if the line is really terminated with + "\r\n", the '\r' will remain unchanged. + */ + char *bufx = bufptr; + static char *bp; + static int cnt = 0; + static char b[ 4096 ]; + char c; + + while ( --len > 0 ) + { + if ( --cnt <= 0 ) + { + cnt = recv( fd, b, sizeof( b ), 0 ); + if ( cnt < 0 ) + { + if ( errno == EINTR ) + { + len++; /* the while will decrement */ + continue; + } + return -1; + } + if ( cnt == 0 ) + return 0; + bp = b; + } + c = *bp++; + *bufptr++ = c; + if ( c == '\n' ) + { + *bufptr = '\0'; + return bufptr - bufx; + } + } + return -1; +} + +int readData(int socket, uint8_t *buf, int buflen) +{ + int sendSize = buflen; + int status; + + while (buflen) + { +#ifndef __MACH__ + status = read(socket, buf, buflen); +#else + status = recv(socket, buf, buflen, 0); +#endif + if (status <= 0) + { + return status; + } + buflen -= status; + buf += status; + } + + return sendSize; +} + +int readHdr(int socket, FlowControlProtocolHeader *hdr) +{ + uint8_t buffer[sizeof(FlowControlProtocolHeader)]; + + uint8_t *data = buffer; + + int status = readData(socket, buffer, sizeof(FlowControlProtocolHeader)); + if (status <= 0) + return status; + + uint32_t value; + data = decode(data, value); + hdr->msgType = value; + + data = decode(data, value); + hdr->seqNumber = value; + + data = decode(data, value); + hdr->status = value; + + data = decode(data, value); + hdr->payloadLen = value; + + return sizeof(FlowControlProtocolHeader); +} + +int readXML(char **xmlContent) +{ + std::ifstream is ("conf/flowServer.xml", std::ifstream::binary); + if (is) { + // get length of file: + is.seekg (0, is.end); + int length = is.tellg(); + is.seekg (0, is.beg); + + char * buffer = new char [length]; + + printf("Reading %s len %d\n", "conf/flowServer.xml", length); + // read data as a block: + is.read (buffer,length); + + is.close(); + + // ...buffer contains the entire file... + *xmlContent = buffer; + + return length; + } + return 0; +} + +static int sockfd = 0, newsockfd = 0; +void sigHandler(int signal) +{ + if (signal == SIGINT || signal == SIGTERM) + { + close(newsockfd); + close(sockfd); + exit(1); + } +} + +int main(int argc, char *argv[]) +{ + int portno; + socklen_t clilen; + struct sockaddr_in serv_addr, cli_addr; + char buffer[4096]; + int flag = 0; + int number = 0; + + int n; + if (argc < 2) { + fprintf(stderr,"ERROR, no port provided\n"); + exit(1); + } + + if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR) + { + + return -1; + } + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) + error("ERROR opening socket"); + bzero((char *) &serv_addr, sizeof(serv_addr)); + portno = atoi(argv[1]); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(portno); + if (bind(sockfd, (struct sockaddr *) &serv_addr, + sizeof(serv_addr)) < 0) + error("ERROR on binding"); + listen(sockfd,5); + if (portno == DEFAULT_NIFI_SERVER_PORT) + { + while (true) + { + clilen = sizeof(cli_addr); + newsockfd = accept(sockfd, + (struct sockaddr *) &cli_addr, + &clilen); + if (newsockfd < 0) + { + error("ERROR on accept"); + break; + } + // process request + FlowControlProtocolHeader hdr; + int status = readHdr(newsockfd, &hdr); + if (status > 0) + { + printf("Flow Control Protocol receive MsgType %s\n", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); + printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber); + printf("Flow Control Protocol receive Resp Code %s\n", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); + printf("Flow Control Protocol receive Payload len %d\n", hdr.payloadLen); + if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ) + { + printf("Flow Control Protocol Register Req receive\n"); + uint8_t *payload = new uint8_t[hdr.payloadLen]; + uint8_t *payloadPtr = payload; + status = readData(newsockfd, payload, hdr.payloadLen); + while (status > 0 && payloadPtr < (payload + hdr.payloadLen)) + { + uint32_t msgID = 0xFFFFFFFF; + payloadPtr = decode(payloadPtr, msgID); + if (((FlowControlMsgID) msgID) == FLOW_SERIAL_NUMBER) + { + // Fixed 8 bytes + uint8_t seqNum[8]; + memcpy(seqNum, payloadPtr, 8); + printf("Flow Control Protocol Register Req receive serial num\n"); + payloadPtr += 8; + } + else if (((FlowControlMsgID) msgID) == FLOW_XML_NAME) + { + uint32_t len; + payloadPtr = decode(payloadPtr, len); + printf("Flow Control Protocol receive XML name length %d\n", len); + std::string flowName = (const char *) payloadPtr; + payloadPtr += len; + printf("Flow Control Protocol receive XML name %s\n", flowName.c_str()); + } + else + { + break; + } + } + delete[] payload; + // Send Register Respond + // Calculate the total payload msg size + char *xmlContent; + uint32_t xmlLen = readXML(&xmlContent); + uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0); + if (xmlLen > 0) + payloadSize += FlowControlMsgIDEncodingLen(FLOW_XML_CONTENT, xmlLen); + + uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; + uint8_t *data = new uint8_t[size]; + uint8_t *start = data; + + // encode the HDR + hdr.msgType = REGISTER_RESP; + hdr.payloadLen = payloadSize; + hdr.status = RESP_SUCCESS; + data = encode(data, hdr.msgType); + data = encode(data, hdr.seqNumber); + data = encode(data, hdr.status); + data = encode(data, hdr.payloadLen); + + // encode the report interval + data = encode(data, REPORT_INTERVAL); + data = encode(data, DEFAULT_REPORT_INTERVAL); + + // encode the XML content + if (xmlLen > 0) + { + data = encode(data, FLOW_XML_CONTENT); + data = encode(data, xmlLen); + data = encode(data, (uint8_t *) xmlContent, xmlLen); + delete[] xmlContent; + } + + // send it + status = sendData(newsockfd, start, size); + delete[] start; + } + else if (((FlowControlMsgType) hdr.msgType) == REPORT_REQ) + { + printf("Flow Control Protocol Report Req receive\n"); + uint8_t *payload = new uint8_t[hdr.payloadLen]; + uint8_t *payloadPtr = payload; + status = readData(newsockfd, payload, hdr.payloadLen); + while (status > 0 && payloadPtr < (payload + hdr.payloadLen)) + { + uint32_t msgID = 0xFFFFFFFF; + payloadPtr = decode(payloadPtr, msgID); + if (((FlowControlMsgID) msgID) == FLOW_XML_NAME) + { + uint32_t len; + payloadPtr = decode(payloadPtr, len); + printf("Flow Control Protocol receive XML name length %d\n", len); + std::string flowName = (const char *) payloadPtr; + payloadPtr += len; + printf("Flow Control Protocol receive XML name %s\n", flowName.c_str()); + } + else + { + break; + } + } + delete[] payload; + // Send Register Respond + // Calculate the total payload msg size + std::string processor = "RealTimeDataCollector"; + std::string propertyName1 = "real Time Message ID"; + std::string propertyValue1 = "41"; + std::string propertyName2 = "Batch Message ID"; + std::string propertyValue2 = "172,30,48"; + if (flag == 0) + { + propertyName1 = "Real Time Message ID"; + propertyValue1 = "41"; + propertyName2 = "Batch Message ID"; + propertyValue2 = "172,48"; + flag = 1; + } + else if (flag == 1) + { + propertyName1 = "Real Time Message ID"; + propertyValue1 = "172,48"; + propertyName2 = "Batch Message ID"; + propertyValue2 = "41"; + flag = 0; + } + uint32_t payloadSize = FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size()+1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size()+1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size()+1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size()+1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size()+1); + + uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; + uint8_t *data = new uint8_t[size]; + uint8_t *start = data; + + // encode the HDR + hdr.msgType = REPORT_RESP; + hdr.payloadLen = payloadSize; + hdr.status = RESP_SUCCESS; + + if (number >= 10 && number < 20) + { + // After 10 second report, stop the flow controller for 10 second + hdr.status = RESP_STOP_FLOW_CONTROLLER; + } + else if (number == 20) + { + // restart the flow controller after 10 second + hdr.status = RESP_START_FLOW_CONTROLLER; + } + else if (number == 30) + { + // retrigger register + hdr.status = RESP_TRIGGER_REGISTER; + number = 0; + } + + number++; + + data = encode(data, hdr.msgType); + data = encode(data, hdr.seqNumber); + data = encode(data, hdr.status); + data = encode(data, hdr.payloadLen); + + // encode the processorName + data = encode(data, PROCESSOR_NAME); + data = encode(data, processor); + + // encode the propertyName and value TLV + data = encode(data, PROPERTY_NAME); + data = encode(data, propertyName1); + data = encode(data, PROPERTY_VALUE); + data = encode(data, propertyValue1); + data = encode(data, PROPERTY_NAME); + data = encode(data, propertyName2); + data = encode(data, PROPERTY_VALUE); + data = encode(data, propertyValue2); + // send it + status = sendData(newsockfd, start, size); + delete[] start; + } + } + close(newsockfd); + } + close(sockfd); + } + else + { + clilen = sizeof(cli_addr); + newsockfd = accept(sockfd, + (struct sockaddr *) &cli_addr, + &clilen); + if (newsockfd < 0) + error("ERROR on accept"); + while (1) + { + bzero(buffer,4096); + n = readline(newsockfd,buffer,4095); + if (n <= 0 ) + { + close(newsockfd); + newsockfd = accept(sockfd, + (struct sockaddr *) &cli_addr, + &clilen); + continue; + } + printf("%s",buffer); + } + close(newsockfd); + close(sockfd); + } + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/main/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt new file mode 100644 index 0000000..2f470a5 --- /dev/null +++ b/main/CMakeLists.txt @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cmake_minimum_required(VERSION 2.6) + +cmake_policy(SET CMP0048 NEW) + +include_directories(../include ../libminifi/include ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include) + +# Include libxml2 +find_package(LibXml2) +if (LIBXML2_FOUND) + include_directories(${LIBXML2_INCLUDE_DIR}) +else () + # Build from our local version +endif (LIBXML2_FOUND) + +add_executable(minifiexe MiNiFiMain.cpp) + +# Link against minifi and yaml-cpp +target_link_libraries(minifiexe minifi yaml-cpp) +set_target_properties(minifiexe + PROPERTIES OUTPUT_NAME minifi) + +install(TARGETS minifiexe + RUNTIME + DESTINATION bin + COMPONENT bin) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Configure.cpp ---------------------------------------------------------------------- diff --git a/src/Configure.cpp b/src/Configure.cpp deleted file mode 100644 index d7fd95b..0000000 --- a/src/Configure.cpp +++ /dev/null @@ -1,167 +0,0 @@ -/** - * @file Configure.cpp - * Configure class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "Configure.h" - -Configure *Configure::_configure(NULL); -const char *Configure::nifi_flow_configuration_file = "nifi.flow.configuration.file"; -const char *Configure::nifi_administrative_yield_duration = "nifi.administrative.yield.duration"; -const char *Configure::nifi_bored_yield_duration = "nifi.bored.yield.duration"; -const char *Configure::nifi_server_name = "nifi.server.name"; -const char *Configure::nifi_server_port = "nifi.server.port"; -const char *Configure::nifi_server_report_interval= "nifi.server.report.interval"; - - -//! Get the config value -bool Configure::get(std::string key, std::string &value) -{ - std::lock_guard<std::mutex> lock(_mtx); - std::map<std::string,std::string>::iterator it = _properties.find(key); - - if (it != _properties.end()) - { - value = it->second; - return true; - } - else - { - return false; - } -} - -// Trim String utils -std::string Configure::trim(const std::string& s) -{ - return trimRight(trimLeft(s)); -} - -std::string Configure::trimLeft(const std::string& s) -{ - const char *WHITESPACE = " \n\r\t"; - size_t startpos = s.find_first_not_of(WHITESPACE); - return (startpos == std::string::npos) ? "" : s.substr(startpos); -} - -std::string Configure::trimRight(const std::string& s) -{ - const char *WHITESPACE = " \n\r\t"; - size_t endpos = s.find_last_not_of(WHITESPACE); - return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1); -} - -//! Parse one line in configure file like key=value -void Configure::parseConfigureFileLine(char *buf) -{ - char *line = buf; - - while ((line[0] == ' ') || (line[0] =='\t')) - ++line; - - char first = line[0]; - if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) - { - return; - } - - char *equal = strchr(line, '='); - if (equal == NULL) - { - return; - } - - equal[0] = '\0'; - std::string key = line; - - equal++; - while ((equal[0] == ' ') || (equal[0] == '\t')) - ++equal; - - first = equal[0]; - if ((first == '\0') || (first == '\r') || (first== '\n')) - { - return; - } - - std::string value = equal; - key = trimRight(key); - value = trimRight(value); - set(key, value); -} - -//! Load Configure File -void Configure::loadConfigureFile(const char *fileName) -{ - - std::string adjustedFilename; - if (fileName) - { - // perform a naive determination if this is a relative path - if (fileName[0] != '/') - { - adjustedFilename = adjustedFilename + _configure->getHome() + "/" + fileName; - } - else - { - adjustedFilename += fileName; - } - } - char *path = NULL; - char full_path[PATH_MAX]; - path = realpath(adjustedFilename.c_str(), full_path); - _logger->log_info("Using configuration file located at %s", path); - - std::ifstream file(path, std::ifstream::in); - if (!file.good()) - { - _logger->log_error("load configure file failed %s", path); - return; - } - this->clear(); - const unsigned int bufSize = 512; - char buf[bufSize]; - for (file.getline(buf, bufSize); file.good(); file.getline(buf, bufSize)) - { - parseConfigureFileLine(buf); - } -} - -//! Parse Command Line -void Configure::parseCommandLine(int argc, char **argv) -{ - int i; - bool keyFound = false; - std::string key, value; - - for (i = 1; i < argc; i++) - { - if (argv[i][0] == '-' && argv[i][1] != '\0') - { - keyFound = true; - key = &argv[i][1]; - continue; - } - if (keyFound) - { - value = argv[i]; - set(key,value); - keyFound = false; - } - } - return; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b02af540/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/src/Connection.cpp b/src/Connection.cpp deleted file mode 100644 index e036b89..0000000 --- a/src/Connection.cpp +++ /dev/null @@ -1,160 +0,0 @@ -/** - * @file Connection.cpp - * Connection class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <iostream> - -#include "Connection.h" - -Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID) -: _name(name) -{ - if (!uuid) - // Generate the global UUID for the flow record - uuid_generate(_uuid); - else - uuid_copy(_uuid, uuid); - - if (srcUUID) - uuid_copy(_srcUUID, srcUUID); - if (destUUID) - uuid_copy(_destUUID, destUUID); - - _srcProcessor = NULL; - _destProcessor = NULL; - _maxQueueSize = 0; - _maxQueueDataSize = 0; - _expiredDuration = 0; - _queuedDataSize = 0; - - _logger = Logger::getLogger(); - - _logger->log_info("Connection %s created", _name.c_str()); -} - -bool Connection::isEmpty() -{ - std::lock_guard<std::mutex> lock(_mtx); - - return _queue.empty(); -} - -bool Connection::isFull() -{ - std::lock_guard<std::mutex> lock(_mtx); - - if (_maxQueueSize <= 0 && _maxQueueDataSize <= 0) - // No back pressure setting - return false; - - if (_maxQueueSize > 0 && _queue.size() >= _maxQueueSize) - return true; - - if (_maxQueueDataSize > 0 && _queuedDataSize >= _maxQueueDataSize) - return true; - - return false; -} - -void Connection::put(FlowFileRecord *flow) -{ - std::lock_guard<std::mutex> lock(_mtx); - - _queue.push(flow); - - _queuedDataSize += flow->getSize(); - - _logger->log_debug("Enqueue flow file UUID %s to connection %s", - flow->getUUIDStr().c_str(), _name.c_str()); -} - -FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords) -{ - std::lock_guard<std::mutex> lock(_mtx); - - while (!_queue.empty()) - { - FlowFileRecord *item = _queue.front(); - _queue.pop(); - _queuedDataSize -= item->getSize(); - - if (_expiredDuration > 0) - { - // We need to check for flow expiration - if (getTimeMillis() > (item->getEntryDate() + _expiredDuration)) - { - // Flow record expired - expiredFlowRecords.insert(item); - } - else - { - // Flow record not expired - if (item->isPenalized()) - { - // Flow record was penalized - _queue.push(item); - _queuedDataSize += item->getSize(); - break; - } - item->setOriginalConnection(this); - _logger->log_debug("Dequeue flow file UUID %s from connection %s", - item->getUUIDStr().c_str(), _name.c_str()); - return item; - } - } - else - { - // Flow record not expired - if (item->isPenalized()) - { - // Flow record was penalized - _queue.push(item); - _queuedDataSize += item->getSize(); - break; - } - item->setOriginalConnection(this); - _logger->log_debug("Dequeue flow file UUID %s from connection %s", - item->getUUIDStr().c_str(), _name.c_str()); - return item; - } - } - - return NULL; -} - -void Connection::drain() -{ - std::lock_guard<std::mutex> lock(_mtx); - - while (!_queue.empty()) - { - FlowFileRecord *item = _queue.front(); - _queue.pop(); - delete item; - } - - _logger->log_debug("Drain connection %s", _name.c_str()); -}