http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/RealTimeDataCollector.cpp ---------------------------------------------------------------------- diff --git a/src/RealTimeDataCollector.cpp b/src/RealTimeDataCollector.cpp new file mode 100644 index 0000000..c7118ff --- /dev/null +++ b/src/RealTimeDataCollector.cpp @@ -0,0 +1,482 @@ +/** + * @file RealTimeDataCollector.cpp + * RealTimeDataCollector class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <random> +#include <netinet/tcp.h> + +#include "RealTimeDataCollector.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +const std::string RealTimeDataCollector::ProcessorName("RealTimeDataCollector"); +Property RealTimeDataCollector::FILENAME("File Name", "File Name for the real time processor to process", "data.osp"); +Property RealTimeDataCollector::REALTIMESERVERNAME("Real Time Server Name", "Real Time Server Name", "localhost"); +Property RealTimeDataCollector::REALTIMESERVERPORT("Real Time Server Port", "Real Time Server Port", "10000"); +Property RealTimeDataCollector::BATCHSERVERNAME("Batch Server Name", "Batch Server Name", "localhost"); +Property RealTimeDataCollector::BATCHSERVERPORT("Batch Server Port", "Batch Server Port", "10001"); +Property RealTimeDataCollector::ITERATION("Iteration", + "If true, sample osp file will be iterated", "true"); +Property RealTimeDataCollector::REALTIMEMSGID("Real Time Message ID", "Real Time Message ID", "41"); +Property RealTimeDataCollector::BATCHMSGID("Batch Message ID", "Batch Message ID", "172, 30, 48"); +Property RealTimeDataCollector::REALTIMEINTERVAL("Real Time Interval", "Real Time Data Collection Interval in msec", "10 ms"); +Property RealTimeDataCollector::BATCHINTERVAL("Batch Time Interval", "Batch Processing Interval in msec", "100 ms"); +Property RealTimeDataCollector::BATCHMAXBUFFERSIZE("Batch Max Buffer Size", "Batch Buffer Maximum size in bytes", "262144"); +Relationship RealTimeDataCollector::Success("success", "success operational on the flow record"); + +void RealTimeDataCollector::initialize() +{ + //! Set the supported properties + std::set<Property> properties; + properties.insert(FILENAME); + properties.insert(REALTIMESERVERNAME); + properties.insert(REALTIMESERVERPORT); + properties.insert(BATCHSERVERNAME); + properties.insert(BATCHSERVERPORT); + properties.insert(ITERATION); + properties.insert(REALTIMEMSGID); + properties.insert(BATCHMSGID); + properties.insert(REALTIMEINTERVAL); + properties.insert(BATCHINTERVAL); + properties.insert(BATCHMAXBUFFERSIZE); + + setSupportedProperties(properties); + //! Set the supported relationships + std::set<Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); + +} + +int RealTimeDataCollector::connectServer(const char *host, uint16_t port) +{ + in_addr_t addr; + int sock = 0; + struct hostent *h; +#ifdef __MACH__ + h = gethostbyname(host); +#else + char buf[1024]; + struct hostent he; + int hh_errno; + gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); +#endif + memcpy((char *) &addr, h->h_addr_list[0], h->h_length); + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) + { + _logger->log_error("Could not create socket to hostName %s", host); + return 0; + } + +#ifndef __MACH__ + int opt = 1; + bool nagle_off = true; + + if (nagle_off) + { + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) + { + _logger->log_error("setsockopt() TCP_NODELAY failed"); + close(sock); + return 0; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&opt, sizeof(opt)) < 0) + { + _logger->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return 0; + } + } + + int sndsize = 256*1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) + { + _logger->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + return 0; + } +#endif + + struct sockaddr_in sa; + socklen_t socklen; + int status; + + //TODO bind socket to the interface + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = htonl(INADDR_ANY); + sa.sin_port = htons(0); + socklen = sizeof(sa); + if (bind(sock, (struct sockaddr *)&sa, socklen) < 0) + { + _logger->log_error("socket bind failed"); + close(sock); + return 0; + } + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = addr; + sa.sin_port = htons(port); + socklen = sizeof(sa); + + status = connect(sock, (struct sockaddr *)&sa, socklen); + + if (status < 0) + { + _logger->log_error("socket connect failed to %s %d", host, port); + close(sock); + return 0; + } + + _logger->log_info("socket %d connect to server %s port %d success", sock, host, port); + + return sock; +} + +int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen) +{ + int ret = 0, bytes = 0; + + while (bytes < buflen) + { + ret = send(socket, buf+bytes, buflen-bytes, 0); + //check for errors + if (ret == -1) + { + return ret; + } + bytes+=ret; + } + + if (ret) + _logger->log_debug("Send data size %d over socket %d", buflen, socket); + + return ret; +} + +void RealTimeDataCollector::onTriggerRealTime(ProcessContext *context, ProcessSession *session) +{ + if (_realTimeAccumulated >= this->_realTimeInterval) + { + std::string value; + if (this->getProperty(REALTIMEMSGID.getName(), value)) + { + this->_realTimeMsgID.clear(); + this->_logger->log_info("Real Time Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while(std::getline(lineStream, cell, ',')) + { + this->_realTimeMsgID.push_back(cell); + // this->_logger->log_debug("Real Time Msg ID %s", cell.c_str()); + } + } + if (this->getProperty(BATCHMSGID.getName(), value)) + { + this->_batchMsgID.clear(); + this->_logger->log_info("Batch Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while(std::getline(lineStream, cell, ',')) + { + cell = Property::trim(cell); + this->_batchMsgID.push_back(cell); + // this->_logger->log_debug("Batch Msg ID %s", cell.c_str()); + } + } + // _logger->log_info("onTriggerRealTime"); + // Open the file + if (!this->_fileStream.is_open()) + { + _fileStream.open(this->_fileName.c_str(), std::ifstream::in); + if (this->_fileStream.is_open()) + _logger->log_debug("open %s", _fileName.c_str()); + } + if (!_fileStream.good()) + { + _logger->log_error("load data file failed %s", _fileName.c_str()); + return; + } + if (this->_fileStream.is_open()) + { + std::string line; + + while (std::getline(_fileStream, line)) + { + line += "\n"; + std::stringstream lineStream(line); + std::string cell; + if (std::getline(lineStream, cell, ',')) + { + cell = Property::trim(cell); + // Check whether it match to the batch traffic + for (std::vector<std::string>::iterator it = _batchMsgID.begin(); it != _batchMsgID.end(); ++it) + { + if (cell == *it) + { + // push the batch data to the queue + std::lock_guard<std::mutex> lock(_mtx); + while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) + { + std::string item = _queue.front(); + _queuedDataSize -= item.size(); + _logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize); + _queue.pop(); + } + _queue.push(line); + _queuedDataSize += line.size(); + _logger->log_debug("Push batch msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize); + } + } + bool findRealTime = false; + // Check whether it match to the real time traffic + for (std::vector<std::string>::iterator it = _realTimeMsgID.begin(); it != _realTimeMsgID.end(); ++it) + { + if (cell == *it) + { + int status = 0; + if (this->_realTimeSocket <= 0) + { + // Connect the LTE socket + uint16_t port = _realTimeServerPort; + this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port); + } + if (this->_realTimeSocket) + { + // try to send the data + status = sendData(_realTimeSocket, line.data(), line.size()); + if (status < 0) + { + close(_realTimeSocket); + _realTimeSocket = 0; + } + } + if (this->_realTimeSocket <= 0 || status < 0) + { + // push the batch data to the queue + std::lock_guard<std::mutex> lock(_mtx); + while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) + { + std::string item = _queue.front(); + _queuedDataSize -= item.size(); + _logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize); + _queue.pop(); + } + _queue.push(line); + _queuedDataSize += line.size(); + _logger->log_debug("Push real time msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize); + } + // find real time + findRealTime = true; + } // cell + } // for real time pattern + if (findRealTime) + // we break the while once we find the first real time + break; + } // if get line + } // while + if (_fileStream.eof()) + { + _fileStream.close(); + } + } // if open + _realTimeAccumulated = 0; + } + _realTimeAccumulated += context->getProcessor()->getSchedulingPeriodNano(); +} + +void RealTimeDataCollector::onTriggerBatch(ProcessContext *context, ProcessSession *session) +{ + if (_batchAcccumulated >= this->_batchInterval) + { + // _logger->log_info("onTriggerBatch"); + // dequeue the batch and send over WIFI + int status = 0; + if (this->_batchSocket <= 0) + { + // Connect the WIFI socket + uint16_t port = _batchServerPort; + this->_batchSocket = connectServer(_batchServerName.c_str(), port); + } + if (this->_batchSocket) + { + std::lock_guard<std::mutex> lock(_mtx); + + while (!_queue.empty()) + { + std::string line = _queue.front(); + status = sendData(_batchSocket, line.data(), line.size()); + _queue.pop(); + _queuedDataSize -= line.size(); + if (status < 0) + { + close(_batchSocket); + _batchSocket = 0; + break; + } + } + } + _batchAcccumulated = 0; + } + _batchAcccumulated += context->getProcessor()->getSchedulingPeriodNano(); +} + +void RealTimeDataCollector::onTrigger(ProcessContext *context, ProcessSession *session) +{ + std::thread::id id = std::this_thread::get_id(); + + if (id == _realTimeThreadId) + return onTriggerRealTime(context, session); + else if (id == _batchThreadId) + return onTriggerBatch(context, session); + else + { + std::lock_guard<std::mutex> lock(_mtx); + if (!this->_firstInvoking) + { + this->_fileName = "data.osp"; + std::string value; + if (this->getProperty(FILENAME.getName(), value)) + { + this->_fileName = value; + this->_logger->log_info("Data Collector File Name %s", _fileName.c_str()); + } + this->_realTimeServerName = "localhost"; + if (this->getProperty(REALTIMESERVERNAME.getName(), value)) + { + this->_realTimeServerName = value; + this->_logger->log_info("Real Time Server Name %s", this->_realTimeServerName.c_str()); + } + this->_realTimeServerPort = 10000; + if (this->getProperty(REALTIMESERVERPORT.getName(), value)) + { + Property::StringToInt(value, _realTimeServerPort); + this->_logger->log_info("Real Time Server Port %d", _realTimeServerPort); + } + if (this->getProperty(BATCHSERVERNAME.getName(), value)) + { + this->_batchServerName = value; + this->_logger->log_info("Batch Server Name %s", this->_batchServerName.c_str()); + } + this->_batchServerPort = 10001; + if (this->getProperty(BATCHSERVERPORT.getName(), value)) + { + Property::StringToInt(value, _batchServerPort); + this->_logger->log_info("Batch Server Port %d", _batchServerPort); + } + if (this->getProperty(ITERATION.getName(), value)) + { + Property::StringToBool(value, this->_iteration); + _logger->log_info("Iteration %d", _iteration); + } + this->_realTimeInterval = 10000000; //10 msec + if (this->getProperty(REALTIMEINTERVAL.getName(), value)) + { + TimeUnit unit; + if (Property::StringToTime(value, _realTimeInterval, unit) && + Property::ConvertTimeUnitToNS(_realTimeInterval, unit, _realTimeInterval)) + { + _logger->log_info("Real Time Interval: [%d] ns", _realTimeInterval); + } + } + this->_batchInterval = 100000000; //100 msec + if (this->getProperty(BATCHINTERVAL.getName(), value)) + { + TimeUnit unit; + if (Property::StringToTime(value, _batchInterval, unit) && + Property::ConvertTimeUnitToNS(_batchInterval, unit, _batchInterval)) + { + _logger->log_info("Batch Time Interval: [%d] ns", _batchInterval); + } + } + this->_batchMaxBufferSize = 256*1024; + if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), value)) + { + Property::StringToInt(value, _batchMaxBufferSize); + this->_logger->log_info("Batch Max Buffer Size %d", _batchMaxBufferSize); + } + if (this->getProperty(REALTIMEMSGID.getName(), value)) + { + this->_logger->log_info("Real Time Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while(std::getline(lineStream, cell, ',')) + { + this->_realTimeMsgID.push_back(cell); + this->_logger->log_info("Real Time Msg ID %s", cell.c_str()); + } + } + if (this->getProperty(BATCHMSGID.getName(), value)) + { + this->_logger->log_info("Batch Msg IDs %s", value.c_str()); + std::stringstream lineStream(value); + std::string cell; + + while(std::getline(lineStream, cell, ',')) + { + cell = Property::trim(cell); + this->_batchMsgID.push_back(cell); + this->_logger->log_info("Batch Msg ID %s", cell.c_str()); + } + } + // Connect the LTE socket + uint16_t port = _realTimeServerPort; + + this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port); + + // Connect the WIFI socket + port = _batchServerPort; + + this->_batchSocket = connectServer(_batchServerName.c_str(), port); + + // Open the file + _fileStream.open(this->_fileName.c_str(), std::ifstream::in); + if (!_fileStream.good()) + { + _logger->log_error("load data file failed %s", _fileName.c_str()); + return; + } + else + { + _logger->log_debug("open %s", _fileName.c_str()); + } + _realTimeThreadId = id; + this->_firstInvoking = true; + } + else + { + if (id != _realTimeThreadId) + _batchThreadId = id; + this->_firstInvoking = false; + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/src/RemoteProcessorGroupPort.cpp b/src/RemoteProcessorGroupPort.cpp new file mode 100644 index 0000000..711e846 --- /dev/null +++ b/src/RemoteProcessorGroupPort.cpp @@ -0,0 +1,99 @@ +/** + * @file RemoteProcessorGroupPort.cpp + * RemoteProcessorGroupPort class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <sstream> +#include <string.h> +#include <iostream> + +#include "TimeUtil.h" +#include "RemoteProcessorGroupPort.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +const std::string RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort"); +Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", "localhost"); +Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999"); +Relationship RemoteProcessorGroupPort::relation; + +void RemoteProcessorGroupPort::initialize() +{ + //! Set the supported properties + std::set<Property> properties; + properties.insert(hostName); + properties.insert(port); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<Relationship> relationships; + relationships.insert(relation); + setSupportedRelationships(relationships); +} + +void RemoteProcessorGroupPort::onTrigger(ProcessContext *context, ProcessSession *session) +{ + std::string value; + + if (!_transmitting) + return; + + std::string host = _peer->getHostName(); + uint16_t sport = _peer->getPort(); + int64_t lvalue; + bool needReset = false; + + if (context->getProperty(hostName.getName(), value)) + { + host = value; + } + if (context->getProperty(port.getName(), value) && Property::StringToInt(value, lvalue)) + { + sport = (uint16_t) lvalue; + } + if (host != _peer->getHostName()) + { + _peer->setHostName(host); + needReset= true; + } + if (sport != _peer->getPort()) + { + _peer->setPort(sport); + needReset = true; + } + if (needReset) + _protocol->tearDown(); + + if (!_protocol->bootstrap()) + { + // bootstrap the client protocol if needeed + context->yield(); + return; + } + + if (_direction == RECEIVE) + _protocol->receiveFlowFiles(context, session); + else + _protocol->transferFlowFiles(context, session); + + return; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/src/Site2SiteClientProtocol.cpp b/src/Site2SiteClientProtocol.cpp new file mode 100644 index 0000000..88ea78a --- /dev/null +++ b/src/Site2SiteClientProtocol.cpp @@ -0,0 +1,1313 @@ +/** + * @file Site2SiteProtocol.cpp + * Site2SiteProtocol class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <sys/time.h> +#include <stdio.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <random> +#include <netinet/tcp.h> +#include <iostream> +#include "Site2SitePeer.h" +#include "Site2SiteClientProtocol.h" + +bool Site2SiteClientProtocol::establish() +{ + if (_peerState != IDLE) + { + _logger->log_error("Site2Site peer state is not idle while try to establish"); + return false; + } + + bool ret = _peer->Open(); + + if (!ret) + { + _logger->log_error("Site2Site peer socket open failed"); + return false; + } + + // Negotiate the version + ret = initiateResourceNegotiation(); + + if (!ret) + { + _logger->log_error("Site2Site Protocol Version Negotiation failed"); + /* + _peer->yield(); + tearDown(); */ + return false; + } + + _logger->log_info("Site2Site socket established"); + _peerState = ESTABLISHED; + + return true; +} + +bool Site2SiteClientProtocol::initiateResourceNegotiation() +{ + // Negotiate the version + if (_peerState != IDLE) + { + _logger->log_error("Site2Site peer state is not idle while initiateResourceNegotiation"); + return false; + } + + _logger->log_info("Negotiate protocol version with destination port %s current version %d", _portIdStr.c_str(), _currentVersion); + + int ret = _peer->writeUTF(this->getResourceName()); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + ret = _peer->write(_currentVersion); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + uint8_t statusCode; + ret = _peer->read(statusCode); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + switch (statusCode) + { + case RESOURCE_OK: + _logger->log_info("Site2Site Protocol Negotiate protocol version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = _peer->read(serverVersion); + if (ret <= 0) + { + // tearDown(); + return false; + } + _logger->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion); + for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion)/sizeof(uint32_t); i++) + { + if (serverVersion >= _supportedVersion[i]) + { + _currentVersion = _supportedVersion[i]; + _currentVersionIndex = i; + return initiateResourceNegotiation(); + } + } + ret = -1; + // tearDown(); + return false; + case NEGOTIATED_ABORT: + _logger->log_info("Site2Site Negotiate protocol response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + _logger->log_info("Negotiate protocol response unknown code %d", statusCode); + return true; + } + + return true; +} + +bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() +{ + // Negotiate the version + if (_peerState != HANDSHAKED) + { + _logger->log_error("Site2Site peer state is not handshaked while initiateCodecResourceNegotiation"); + return false; + } + + _logger->log_info("Negotiate Codec version with destination port %s current version %d", _portIdStr.c_str(), _currentCodecVersion); + + int ret = _peer->writeUTF(this->getCodecResourceName()); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + ret = _peer->write(_currentCodecVersion); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + uint8_t statusCode; + ret = _peer->read(statusCode); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + switch (statusCode) + { + case RESOURCE_OK: + _logger->log_info("Site2Site Codec Negotiate version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = _peer->read(serverVersion); + if (ret <= 0) + { + // tearDown(); + return false; + } + _logger->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion); + for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion)/sizeof(uint32_t); i++) + { + if (serverVersion >= _supportedCodecVersion[i]) + { + _currentCodecVersion = _supportedCodecVersion[i]; + _currentCodecVersionIndex = i; + return initiateCodecResourceNegotiation(); + } + } + ret = -1; + // tearDown(); + return false; + case NEGOTIATED_ABORT: + _logger->log_info("Site2Site Codec Negotiate response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + _logger->log_info("Negotiate Codec response unknown code %d", statusCode); + return true; + } + + return true; +} + +bool Site2SiteClientProtocol::handShake() +{ + if (_peerState != ESTABLISHED) + { + _logger->log_error("Site2Site peer state is not established while handshake"); + return false; + } + _logger->log_info("Site2Site Protocol Perform hand shake with destination port %s", _portIdStr.c_str()); + uuid_t uuid; + // Generate the global UUID for the com identify + uuid_generate(uuid); + char uuidStr[37]; + uuid_unparse(uuid, uuidStr); + _commsIdentifier = uuidStr; + + int ret = _peer->writeUTF(_commsIdentifier); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + std::map<std::string, std::string> properties; + properties[HandShakePropertyStr[GZIP]] = "false"; + properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr; + properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(this->_timeOut); + if (this->_currentVersion >= 5) + { + if (this->_batchCount > 0) + properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(this->_batchCount); + if (this->_batchSize > 0) + properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(this->_batchSize); + if (this->_batchDuration > 0) + properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(this->_batchDuration); + } + + if (_currentVersion >= 3) + { + ret = _peer->writeUTF(_peer->getURL()); + if (ret <= 0) + { + // tearDown(); + return false; + } + } + + uint32_t size = properties.size(); + ret = _peer->write(size); + if (ret <= 0) + { + // tearDown(); + return false; + } + + std::map<std::string, std::string>::iterator it; + for (it = properties.begin(); it!= properties.end(); it++) + { + ret = _peer->writeUTF(it->first); + if (ret <= 0) + { + // tearDown(); + return false; + } + ret = _peer->writeUTF(it->second); + if (ret <= 0) + { + // tearDown(); + return false; + } + _logger->log_info("Site2Site Protocol Send handshake properties %s %s", it->first.c_str(), it->second.c_str()); + } + + RespondCode code; + std::string message; + + ret = this->readRespond(code, message); + + if (ret <= 0) + { + // tearDown(); + return false; + } + + switch (code) + { + case PROPERTIES_OK: + _logger->log_info("Site2Site HandShake Completed"); + _peerState = HANDSHAKED; + return true; + case PORT_NOT_IN_VALID_STATE: + case UNKNOWN_PORT: + case PORTS_DESTINATION_FULL: + _logger->log_error("Site2Site HandShake Failed because destination port is either invalid or full"); + ret = -1; + /* + _peer->yield(); + tearDown(); */ + return false; + default: + _logger->log_info("HandShake Failed because of unknown respond code %d", code); + ret = -1; + /* + _peer->yield(); + tearDown(); */ + return false; + } + + return false; +} + +void Site2SiteClientProtocol::tearDown() +{ + if (_peerState >= ESTABLISHED) + { + _logger->log_info("Site2Site Protocol tearDown"); + // need to write shutdown request + writeRequestType(SHUTDOWN); + } + + std::map<std::string, Transaction *>::iterator it; + for (it = _transactionMap.begin(); it!= _transactionMap.end(); it++) + { + delete it->second; + } + _transactionMap.clear(); + _peer->Close(); + _peerState = IDLE; +} + +int Site2SiteClientProtocol::writeRequestType(RequestType type) +{ + if (type >= MAX_REQUEST_TYPE) + return -1; + + return _peer->writeUTF(RequestTypeStr[type]); +} + +int Site2SiteClientProtocol::readRequestType(RequestType &type) +{ + std::string requestTypeStr; + + int ret = _peer->readUTF(requestTypeStr); + + if (ret <= 0) + return ret; + + for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++) + { + if (RequestTypeStr[i] == requestTypeStr) + { + type = (RequestType) i; + return ret; + } + } + + return -1; +} + +int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string &message) +{ + uint8_t firstByte; + + int ret = _peer->read(firstByte); + + if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1) + return -1; + + uint8_t secondByte; + + ret = _peer->read(secondByte); + + if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2) + return -1; + + uint8_t thirdByte; + + ret = _peer->read(thirdByte); + + if (ret <= 0) + return ret; + + code = (RespondCode) thirdByte; + + RespondCodeContext *resCode = this->getRespondCodeContext(code); + + if ( resCode == NULL) + { + // Not a valid respond code + return -1; + } + if (resCode->hasDescription) + { + ret = _peer->readUTF(message); + if (ret <= 0) + return -1; + } + return 3 + message.size(); +} + +int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string message) +{ + RespondCodeContext *resCode = this->getRespondCodeContext(code); + + if (resCode == NULL) + { + // Not a valid respond code + return -1; + } + + uint8_t codeSeq[3]; + codeSeq[0] = CODE_SEQUENCE_VALUE_1; + codeSeq[1] = CODE_SEQUENCE_VALUE_2; + codeSeq[2] = (uint8_t) code; + + int ret = _peer->write(codeSeq, 3); + + if (ret != 3) + return -1; + + if (resCode->hasDescription) + { + ret = _peer->writeUTF(message); + if (ret > 0) + return (3 + ret); + else + return ret; + } + else + return 3; +} + +bool Site2SiteClientProtocol::negotiateCodec() +{ + if (_peerState != HANDSHAKED) + { + _logger->log_error("Site2Site peer state is not handshaked while negotiate codec"); + return false; + } + + _logger->log_info("Site2Site Protocol Negotiate Codec with destination port %s", _portIdStr.c_str()); + + int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC); + + if (status <= 0) + { + // tearDown(); + return false; + } + + // Negotiate the codec version + bool ret = initiateCodecResourceNegotiation(); + + if (!ret) + { + _logger->log_error("Site2Site Codec Version Negotiation failed"); + /* + _peer->yield(); + tearDown(); */ + return false; + } + + _logger->log_info("Site2Site Codec Completed and move to READY state for data transfer"); + _peerState = READY; + + return true; +} + +bool Site2SiteClientProtocol::bootstrap() +{ + if (_peerState == READY) + return true; + + tearDown(); + + if (establish() && handShake() && negotiateCodec()) + { + _logger->log_info("Site2Site Ready For data transaction"); + return true; + } + else + { + _peer->yield(); + tearDown(); + return false; + } +} + +Transaction* Site2SiteClientProtocol::createTransaction(std::string &transactionID, TransferDirection direction) +{ + int ret; + bool dataAvailable; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + return NULL; + } + + if (direction == RECEIVE) + { + ret = writeRequestType(RECEIVE_FLOWFILES); + + if (ret <= 0) + { + // tearDown(); + return NULL; + } + + RespondCode code; + std::string message; + + ret = readRespond(code, message); + + if (ret <= 0) + { + // tearDown(); + return NULL; + } + + switch (code) + { + case MORE_DATA: + dataAvailable = true; + _logger->log_info("Site2Site peer indicates that data is available"); + transaction = new Transaction(direction); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); + return transaction; + case NO_MORE_DATA: + dataAvailable = false; + _logger->log_info("Site2Site peer indicates that no data is available"); + transaction = new Transaction(direction); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); + return transaction; + default: + _logger->log_info("Site2Site got unexpected response %d when asking for data", code); + // tearDown(); + return NULL; + } + } + else + { + ret = writeRequestType(SEND_FLOWFILES); + + if (ret <= 0) + { + // tearDown(); + return NULL; + } + else + { + transaction = new Transaction(direction); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); + return transaction; + } + } +} + +bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket *packet, bool &eof) +{ + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + return false; + } + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return false; + } + else + { + transaction = it->second; + } + + if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) + { + _logger->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); + return false; + } + + if (transaction->getDirection() != RECEIVE) + { + _logger->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); + return false; + } + + if (!transaction->isDataAvailable()) + { + eof = true; + return true; + } + + if (transaction->_transfers > 0) + { + // if we already has transfer before, check to see whether another one is available + RespondCode code; + std::string message; + + ret = readRespond(code, message); + + if (ret <= 0) + { + return false; + } + if (code == CONTINUE_TRANSACTION) + { + _logger->log_info("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str()); + transaction->_dataAvailable = true; + } + else if (code == FINISH_TRANSACTION) + { + _logger->log_info("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str()); + transaction->_dataAvailable = false; + } + else + { + _logger->log_info("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code); + return false; + } + } + + if (!transaction->isDataAvailable()) + { + eof = true; + return true; + } + + // start to read the packet + uint32_t numAttributes; + ret = _peer->read(numAttributes, &transaction->_crc); + if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) + { + return false; + } + + // read the attributes + for (unsigned int i = 0; i < numAttributes; i++) + { + std::string key; + std::string value; + ret = _peer->readUTF(key, true, &transaction->_crc); + if (ret <= 0) + { + return false; + } + ret = _peer->readUTF(value, true, &transaction->_crc); + if (ret <= 0) + { + return false; + } + packet->_attributes[key] = value; + _logger->log_info("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str()); + } + + uint64_t len; + ret = _peer->read(len, &transaction->_crc); + if (ret <= 0) + { + return false; + } + + packet->_size = len; + transaction->_transfers++; + transaction->_state = DATA_EXCHANGED; + transaction->_bytes += len; + _logger->log_info("Site2Site transaction %s receives flow record %d, total length %d", transactionID.c_str(), + transaction->_transfers, transaction->_bytes); + + return true; +} + +bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, FlowFileRecord *flowFile, ProcessSession *session) +{ + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + return false; + } + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return false; + } + else + { + transaction = it->second; + } + + if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) + { + _logger->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); + return false; + } + + if (transaction->getDirection() != SEND) + { + _logger->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); + return false; + } + + if (transaction->_transfers > 0) + { + ret = writeRespond(CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION"); + if (ret <= 0) + { + return false; + } + } + + // start to read the packet + uint32_t numAttributes = packet->_attributes.size(); + ret = _peer->write(numAttributes, &transaction->_crc); + if (ret != 4) + { + return false; + } + + std::map<std::string, std::string>::iterator itAttribute; + for (itAttribute = packet->_attributes.begin(); itAttribute!= packet->_attributes.end(); itAttribute++) + { + ret = _peer->writeUTF(itAttribute->first, true, &transaction->_crc); + if (ret <= 0) + { + return false; + } + ret = _peer->writeUTF(itAttribute->second, true, &transaction->_crc); + if (ret <= 0) + { + return false; + } + _logger->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), + itAttribute->first.c_str(), itAttribute->second.c_str()); + } + + uint64_t len = flowFile->getSize() ; + ret = _peer->write(len, &transaction->_crc); + if (ret != 8) + { + return false; + } + + if (flowFile->getSize()) + { + Site2SiteClientProtocol::ReadCallback callback(packet); + session->read(flowFile, &callback); + if (flowFile->getSize() != packet->_size) + { + return false; + } + } + + transaction->_transfers++; + transaction->_state = DATA_EXCHANGED; + transaction->_bytes += len; + _logger->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(), + transaction->_transfers, transaction->_bytes); + + return true; +} + +void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, ProcessSession *session) +{ + uint64_t bytes = 0; + int transfers = 0; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); + return; + } + + // Create the transaction + std::string transactionID; + transaction = createTransaction(transactionID, RECEIVE); + + if (transaction == NULL) + { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); + return; + } + + try + { + while (true) + { + std::map<std::string, std::string> empty; + DataPacket packet(this, transaction, empty); + bool eof = false; + + if (!receive(transactionID, &packet, eof)) + { + throw Exception(SITE2SITE_EXCEPTION, "Receive Failed"); + return; + } + if (eof) + { + // transaction done + break; + } + FlowFileRecord *flowFile = session->create(); + if (!flowFile) + { + throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed"); + return; + } + std::map<std::string, std::string>::iterator it; + for (it = packet._attributes.begin(); it!= packet._attributes.end(); it++) + { + flowFile->addAttribute(it->first, it->second); + } + + if (packet._size > 0) + { + Site2SiteClientProtocol::WriteCallback callback(&packet); + session->write(flowFile, &callback); + if (flowFile->getSize() != packet._size) + { + throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right"); + return; + } + } + Relationship relation; // undefined relationship + session->transfer(flowFile, relation); + // receive the transfer for the flow record + bytes += packet._size; + transfers++; + } // while true + + if (!confirm(transactionID)) + { + throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed"); + return; + } + if (!complete(transactionID)) + { + throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed"); + return; + } + _logger->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", + transactionID.c_str(), transfers, bytes); + // we yield the receive if we did not get anything + if (transfers == 0) + context->yield(); + } + catch (std::exception &exception) + { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + _logger->log_debug("Caught Exception during Site2SiteClientProtocol::receiveFlowFiles"); + throw; + } + + deleteTransaction(transactionID); + + return; +} + +bool Site2SiteClientProtocol::confirm(std::string transactionID) +{ + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + return false; + } + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return false; + } + else + { + transaction = it->second; + } + + if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() && + transaction->getDirection() == RECEIVE) + { + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } + + if (transaction->getState() != DATA_EXCHANGED) + return false; + + if (transaction->getDirection() == RECEIVE) + { + if (transaction->isDataAvailable()) + return false; + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + long crcValue = transaction->getCRC(); + std::string crc = std::to_string(crcValue); + _logger->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), + transactionID.c_str()); + ret = writeRespond(CONFIRM_TRANSACTION, crc); + if (ret <= 0) + return false; + RespondCode code; + std::string message; + readRespond(code, message); + if (ret <= 0) + return false; + + if (code == CONFIRM_TRANSACTION) + { + _logger->log_info("Site2Site transaction %s peer confirm transaction", transactionID.c_str()); + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } + else if (code == BAD_CHECKSUM) + { + _logger->log_info("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str()); + /* + transaction->_state = TRANSACTION_CONFIRMED; + return true; */ + return false; + } + else + { + _logger->log_info("Site2Site transaction %s peer unknown respond code %d", + transactionID.c_str(), code); + return false; + } + } + else + { + _logger->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", + transactionID.c_str()); + ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION"); + if (ret <= 0) + return false; + RespondCode code; + std::string message; + readRespond(code, message); + if (ret <= 0) + return false; + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + if (code == CONFIRM_TRANSACTION) + { + _logger->log_info("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str()); + if (this->_currentVersion > 3) + { + long crcValue = transaction->getCRC(); + std::string crc = std::to_string(crcValue); + if (message == crc) + { + _logger->log_info("Site2Site transaction %s CRC matched", transactionID.c_str()); + ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); + if (ret <= 0) + return false; + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } + else + { + _logger->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str()); + ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM"); + /* + ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); + if (ret <= 0) + return false; + transaction->_state = TRANSACTION_CONFIRMED; + return true; */ + return false; + } + } + ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); + if (ret <= 0) + return false; + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } + else + { + _logger->log_info("Site2Site transaction %s peer unknown respond code %d", + transactionID.c_str(), code); + return false; + } + return false; + } +} + +void Site2SiteClientProtocol::cancel(std::string transactionID) +{ + Transaction *transaction = NULL; + + if (_peerState != READY) + { + return; + } + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return; + } + else + { + transaction = it->second; + } + + if (transaction->getState() == TRANSACTION_CANCELED || transaction->getState() == TRANSACTION_COMPLETED + || transaction->getState() == TRANSACTION_ERROR) + { + return; + } + + this->writeRespond(CANCEL_TRANSACTION, "Cancel"); + transaction->_state = TRANSACTION_CANCELED; + + tearDown(); + return; +} + +void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) +{ + Transaction *transaction = NULL; + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return; + } + else + { + transaction = it->second; + } + + _logger->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str()); + delete transaction; + _transactionMap.erase(transactionID); +} + +void Site2SiteClientProtocol::error(std::string transactionID) +{ + Transaction *transaction = NULL; + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return; + } + else + { + transaction = it->second; + } + + transaction->_state = TRANSACTION_ERROR; + tearDown(); + return; +} + +//! Complete the transaction +bool Site2SiteClientProtocol::complete(std::string transactionID) +{ + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + return false; + } + + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) + { + return false; + } + else + { + transaction = it->second; + } + + if (transaction->getState() != TRANSACTION_CONFIRMED) + { + return false; + } + + if (transaction->getDirection() == RECEIVE) + { + if (transaction->_transfers == 0) + { + transaction->_state = TRANSACTION_COMPLETED; + return true; + } + else + { + _logger->log_info("Site2Site transaction %s send finished", transactionID.c_str()); + ret = this->writeRespond(TRANSACTION_FINISHED, "Finished"); + if (ret <= 0) + return false; + else + { + transaction->_state = TRANSACTION_COMPLETED; + return true; + } + } + } + else + { + RespondCode code; + std::string message; + int ret; + + ret = readRespond(code, message); + + if (ret <= 0) + return false; + + if (code == TRANSACTION_FINISHED) + { + _logger->log_info("Site2Site transaction %s peer finished transaction", transactionID.c_str()); + transaction->_state = TRANSACTION_COMPLETED; + return true; + } + else + { + _logger->log_info("Site2Site transaction %s peer unknown respond code %d", + transactionID.c_str(), code); + return false; + } + } +} + +void Site2SiteClientProtocol::transferFlowFiles(ProcessContext *context, ProcessSession *session) +{ + FlowFileRecord *flow = session->get(); + Transaction *transaction = NULL; + + if (!flow) + return; + + if (_peerState != READY) + { + bootstrap(); + } + + if (_peerState != READY) + { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); + return; + } + + // Create the transaction + std::string transactionID; + transaction = createTransaction(transactionID, SEND); + + if (transaction == NULL) + { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); + return; + } + + bool continueTransaction = true; + uint64_t startSendingNanos = getTimeNano(); + + try + { + while (continueTransaction) + { + DataPacket packet(this, transaction, flow->getAttributes()); + + if (!send(transactionID, &packet, flow, session)) + { + throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); + return; + } + _logger->log_info("Site2Site transaction %s send flow record %s", + transactionID.c_str(), flow->getUUIDStr().c_str()); + session->remove(flow); + + uint64_t transferNanos = getTimeNano() - startSendingNanos; + if (transferNanos > _batchSendNanos) + break; + + flow = session->get(); + if (!flow) + { + continueTransaction = false; + } + } // while true + + if (!confirm(transactionID)) + { + throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); + return; + } + if (!complete(transactionID)) + { + throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); + return; + } + _logger->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", + transactionID.c_str(), transaction->_transfers, transaction->_bytes); + } + catch (std::exception &exception) + { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + _logger->log_debug("Caught Exception during Site2SiteClientProtocol::transferFlowFiles"); + throw; + } + + deleteTransaction(transactionID); + + return; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/src/Site2SitePeer.cpp ---------------------------------------------------------------------- diff --git a/src/Site2SitePeer.cpp b/src/Site2SitePeer.cpp new file mode 100644 index 0000000..c844aa5 --- /dev/null +++ b/src/Site2SitePeer.cpp @@ -0,0 +1,434 @@ +/** + * @file Site2SitePeer.cpp + * Site2SitePeer class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <sys/time.h> +#include <stdio.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <random> +#include <netinet/tcp.h> +#include <iostream> +#include "Site2SitePeer.h" + +//! CRC tables +std::atomic<bool> CRC32::tableInit(false); +unsigned int CRC32::table[256]; + +bool Site2SitePeer::Open() +{ + in_addr_t addr; + int sock = 0; + struct hostent *h; + const char *host; + uint16_t port; + + host = this->_host.c_str(); + port = this->_port; + + if (strlen(host) == 0) + return false; + +#ifdef __MACH__ + h = gethostbyname(host); +#else + char buf[1024]; + struct hostent he; + int hh_errno; + gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); +#endif + memcpy((char *) &addr, h->h_addr_list[0], h->h_length); + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) + { + _logger->log_error("Could not create socket to hostName %s", host); + this->yield(); + return false; + } + +#ifndef __MACH__ + int opt = 1; + bool nagle_off = true; + + if (nagle_off) + { + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) + { + _logger->log_error("setsockopt() TCP_NODELAY failed"); + close(sock); + this->yield(); + return false; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&opt, sizeof(opt)) < 0) + { + _logger->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + this->yield(); + return false; + } + } + + int sndsize = 256*1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) + { + _logger->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + this->yield(); + return false; + } + int rcvsize = 256*1024; + if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&rcvsize, (int)sizeof(rcvsize)) < 0) + { + _logger->log_error("setsockopt() SO_RCVBUF failed"); + close(sock); + this->yield(); + return false; + } +#endif + + struct sockaddr_in sa; + socklen_t socklen; + int status; + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = htonl(INADDR_ANY); + sa.sin_port = htons(0); + socklen = sizeof(sa); + if (bind(sock, (struct sockaddr *)&sa, socklen) < 0) + { + _logger->log_error("socket bind failed"); + close(sock); + this->yield(); + return false; + } + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = addr; + sa.sin_port = htons(port); + socklen = sizeof(sa); + + status = connect(sock, (struct sockaddr *)&sa, socklen); + + if (status < 0) + { + _logger->log_error("socket connect failed to %s %d", host, port); + close(sock); + this->yield(); + return false; + } + + _logger->log_info("Site2Site Peer socket %d connect to server %s port %d success", sock, host, port); + + _socket = sock; + + status = sendData((uint8_t *) MAGIC_BYTES, sizeof(MAGIC_BYTES)); + + if (status <= 0) + { + Close(); + return false; + } + + return true; +} + +void Site2SitePeer::Close() +{ + if (_socket) + { + _logger->log_info("Site2Site Peer socket %d close", _socket); + close(_socket); + _socket = 0; + } +} + +int Site2SitePeer::sendData(uint8_t *buf, int buflen, CRC32 *crc) +{ + int ret = 0, bytes = 0; + + if (_socket <= 0) + { + // this->yield(); + return -1; + } + + while (bytes < buflen) + { + ret = send(_socket, buf+bytes, buflen-bytes, 0); + //check for errors + if (ret == -1) + { + Close(); + // this->yield(); + return ret; + } + bytes+=ret; + } + + if (crc) + crc->update(buf, buflen); + + return bytes; +} + +int Site2SitePeer::Select(int msec) +{ + fd_set fds; + struct timeval tv; + int retval; + int fd = _socket; + + FD_ZERO(&fds); + FD_SET(fd, &fds); + + tv.tv_sec = msec/1000; + tv.tv_usec = (msec % 1000) * 1000; + + if (msec > 0) + retval = select(fd+1, &fds, NULL, NULL, &tv); + else + retval = select(fd+1, &fds, NULL, NULL, NULL); + + if (retval <= 0) + return retval; + if (FD_ISSET(fd, &fds)) + return retval; + else + return 0; +} + +int Site2SitePeer::readData(uint8_t *buf, int buflen, CRC32 *crc) +{ + int sendSize = buflen; + uint8_t *start = buf; + + if (_socket <= 0) + { + // this->yield(); + return -1; + } + + while (buflen) + { + int status; + status = Select((int) _timeOut); + if (status <= 0) + { + Close(); + return status; + } + status = recv(_socket, buf, buflen, 0); + if (status <= 0) + { + Close(); + // this->yield(); + return status; + } + buflen -= status; + buf += status; + } + + if (crc) + crc->update(start, sendSize); + + return sendSize; +} + +int Site2SitePeer::writeUTF(std::string str, bool widen, CRC32 *crc) +{ + int strlen = str.length(); + int utflen = 0; + int c, count = 0; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = str.at(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + if (utflen > 65535) + return -1; + + uint8_t *bytearr = NULL; + if (!widen) + { + bytearr = new uint8_t[utflen+2]; + bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF); + bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF); + } + else + { + bytearr = new uint8_t[utflen+4]; + bytearr[count++] = (uint8_t) ((utflen >> 24) & 0xFF); + bytearr[count++] = (uint8_t) ((utflen >> 16) & 0xFF); + bytearr[count++] = (uint8_t) ((utflen >> 8) & 0xFF); + bytearr[count++] = (uint8_t) ((utflen >> 0) & 0xFF); + } + + int i=0; + for (i=0; i<strlen; i++) { + c = str.at(i); + if (!((c >= 0x0001) && (c <= 0x007F))) break; + bytearr[count++] = (uint8_t) c; + } + + for (;i < strlen; i++){ + c = str.at(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + bytearr[count++] = (uint8_t) c; + } else if (c > 0x07FF) { + bytearr[count++] = (uint8_t) (0xE0 | ((c >> 12) & 0x0F)); + bytearr[count++] = (uint8_t) (0x80 | ((c >> 6) & 0x3F)); + bytearr[count++] = (uint8_t) (0x80 | ((c >> 0) & 0x3F)); + } else { + bytearr[count++] = (uint8_t) (0xC0 | ((c >> 6) & 0x1F)); + bytearr[count++] = (uint8_t) (0x80 | ((c >> 0) & 0x3F)); + } + } + int ret; + if (!widen) + { + ret = sendData(bytearr, utflen+2, crc); + } + else + { + ret = sendData(bytearr, utflen+4, crc); + } + delete[] bytearr; + return ret; +} + +int Site2SitePeer::readUTF(std::string &str, bool widen, CRC32 *crc) +{ + uint16_t utflen; + int ret; + + if (!widen) + { + ret = read(utflen, crc); + if (ret <= 0) + return ret; + } + else + { + uint32_t len; + ret = read(len, crc); + if (ret <= 0) + return ret; + utflen = len; + } + + uint8_t *bytearr = NULL; + char *chararr = NULL; + bytearr = new uint8_t[utflen]; + chararr = new char[utflen]; + memset(chararr, 0, utflen); + + int c, char2, char3; + int count = 0; + int chararr_count=0; + + ret = read(bytearr, utflen, crc); + if (ret <= 0) + { + delete[] bytearr; + delete[] chararr; + return ret; + } + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + if (c > 127) break; + count++; + chararr[chararr_count++]=(char)c; + } + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + switch (c >> 4) { + case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7: + /* 0xxxxxxx*/ + count++; + chararr[chararr_count++]=(char)c; + break; + case 12: case 13: + /* 110x xxxx 10xx xxxx*/ + count += 2; + if (count > utflen) + { + delete[] bytearr; + delete[] chararr; + return -1; + } + char2 = (int) bytearr[count-1]; + if ((char2 & 0xC0) != 0x80) + { + delete[] bytearr; + delete[] chararr; + return -1; + } + chararr[chararr_count++]=(char)(((c & 0x1F) << 6) | + (char2 & 0x3F)); + break; + case 14: + /* 1110 xxxx 10xx xxxx 10xx xxxx */ + count += 3; + if (count > utflen) + { + delete[] bytearr; + delete[] chararr; + return -1; + } + char2 = (int) bytearr[count-2]; + char3 = (int) bytearr[count-1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) + { + delete[] bytearr; + delete[] chararr; + return -1; + } + chararr[chararr_count++]=(char)(((c & 0x0F) << 12) | + ((char2 & 0x3F) << 6) | + ((char3 & 0x3F) << 0)); + break; + default: + delete[] bytearr; + delete[] chararr; + return -1; + } + } + // The number of chars produced may be less than utflen + std::string value(chararr, chararr_count); + str = value; + delete[] bytearr; + delete[] chararr; + if (!widen) + return (2 + utflen); + else + return (4 + utflen); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/target/conf/flow.xml ---------------------------------------------------------------------- diff --git a/target/conf/flow.xml b/target/conf/flow.xml index 56125f8..51b74e8 100644 --- a/target/conf/flow.xml +++ b/target/conf/flow.xml @@ -8,6 +8,41 @@ <position x="0.0" y="0.0"/> <comment/> <processor> + <id>e01275ae-ac38-48f9-ac53-1a44df1be88e</id> + <name>LogAttribute</name> + <position x="3950.0958625440016" y="1355.8949219185629"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.LogAttribute</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>Log Level</name> + <value>info</value> + </property> + <property> + <name>Log Payload</name> + <value>false</value> + </property> + <property> + <name>Attributes to Log</name> + </property> + <property> + <name>Attributes to Ignore</name> + </property> + <property> + <name>Log prefix</name> + </property> + <autoTerminatedRelationship>success</autoTerminatedRelationship> + </processor> + <processor> <id>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</id> <name>LogAttribute</name> <position x="3259.732177734375" y="1739.991943359375"/> @@ -60,7 +95,7 @@ <runDurationNanos>0</runDurationNanos> <property> <name>File Size</name> - <value>1 kB</value> + <value>1024 kB</value> </property> <property> <name>Batch Size</name> @@ -75,6 +110,69 @@ <value>false</value> </property> </processor> + <label> + <id>809d63d9-6feb-496a-9dc3-d23c217e52fd</id> + <position x="3635.581271381991" y="1309.9918825902428"/> + <size height="193.5023651123047" width="641.0671997070312"/> + <styles> + <style name="background-color">#9a91ff</style> + <style name="font-size">16px</style> + </styles> + <value>Pull From Node B</value> + </label> + <label> + <id>d95ce8d3-c005-4d0b-8fcc-b2f6fae7172f</id> + <position x="2601.7320892530847" y="1413.1875613011803"/> + <size height="193.5023651123047" width="641.0671997070312"/> + <styles> + <style name="font-size">16px</style> + </styles> + <value>Push to Node B</value> + </label> + <remoteProcessGroup> + <id>8f3b248f-d493-4269-b317-36f85719f480</id> + <name>NiFi Flow</name> + <position x="3254.3356850982673" y="1432.3274284388426"/> + <comment/> + <url>http://localhost:8081/nifi</url> + <timeout>30 sec</timeout> + <yieldPeriod>1 sec</yieldPeriod> + <transmitting>true</transmitting> + <inputPort> + <id>471deef6-2a6e-4a7d-912a-81cc17e3a204</id> + <name> From Node A</name> + <position x="0.0" y="0.0"/> + <comments/> + <scheduledState>RUNNING</scheduledState> + <maxConcurrentTasks>1</maxConcurrentTasks> + <useCompression>false</useCompression> + <property> + <name>Host Name</name> + <value>localhost</value> + </property> + <property> + <name>Port</name> + <value>10001</value> + </property> + </inputPort> + <outputPort> + <id>75f88005-0a87-4fef-8320-6219cdbcf18b</id> + <name>To A</name> + <position x="0.0" y="0.0"/> + <comments/> + <scheduledState>RUNNING</scheduledState> + <maxConcurrentTasks>1</maxConcurrentTasks> + <useCompression>false</useCompression> + <property> + <name>Host Name</name> + <value>localhost</value> + </property> + <property> + <name>Port</name> + <value>10001</value> + </property> + </outputPort> + </remoteProcessGroup> <connection> <id>c4cf70d8-be05-4c3d-b926-465f330d6503</id> <name/> @@ -93,20 +191,35 @@ <flowFileExpiration>0 sec</flowFileExpiration> </connection> <connection> - <id>673cf83c-d261-4b6b-8e5a-19052fe40025</id> - <name/> - <bendPoints> - <bendPoint x="3106.4228882570283" y="1573.7169700192542"/> - </bendPoints> + <id>c9573abe-937c-464b-b18d-48b29c42dce2</id> + <name>site2siteSEND</name> + <bendPoints/> <labelIndex>1</labelIndex> <zIndex>0</zIndex> <sourceId>a0e57bb2-5b89-438e-8869-0326bbdbbe43</sourceId> <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId> <sourceType>PROCESSOR</sourceType> - <destinationId>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</destinationId> + <destinationId>471deef6-2a6e-4a7d-912a-81cc17e3a204</destinationId> + <destinationGroupId>8f3b248f-d493-4269-b317-36f85719f480</destinationGroupId> + <destinationType>REMOTE_INPUT_PORT</destinationType> + <relationship>success</relationship> + <maxWorkQueueSize>0</maxWorkQueueSize> + <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> + <connection> + <id>2cb90b4c-d6cb-4fef-8f0f-b16459561af5</id> + <name>site2siteReceive</name> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>75f88005-0a87-4fef-8320-6219cdbcf18b</sourceId> + <sourceGroupId>8f3b248f-d493-4269-b317-36f85719f480</sourceGroupId> + <sourceType>REMOTE_OUTPUT_PORT</sourceType> + <destinationId>e01275ae-ac38-48f9-ac53-1a44df1be88e</destinationId> <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId> <destinationType>PROCESSOR</destinationType> - <relationship>success</relationship> + <relationship/> <maxWorkQueueSize>0</maxWorkQueueSize> <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> <flowFileExpiration>0 sec</flowFileExpiration> @@ -156,7 +269,7 @@ </property> <property> <name>Truststore Password</name> - <value>enc{9E2EE146023A0F31914706460EB177B357796CF0C768DECE09D10C4B40F344C8}</value> + <value>enc{3A31531B76B6395A72FB8BEB4C93E2040877D07C04FDAB5A84499B918BECEB77}</value> </property> <property> <name>Truststore Type</name> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/target/conf/flowServer.xml ---------------------------------------------------------------------- diff --git a/target/conf/flowServer.xml b/target/conf/flowServer.xml new file mode 100644 index 0000000..caca3eb --- /dev/null +++ b/target/conf/flowServer.xml @@ -0,0 +1,130 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + <processor> + <id>572aa3f3-6288-4ca1-ae43-5e492cb0ea23</id> + <name>RealTimeDataCollector</name> + <position x="3259.732177734375" y="1739.991943359375"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.RealTimeDataCollector</class> + <maxConcurrentTasks>2</maxConcurrentTasks> + <schedulingPeriod>10 ms</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>File Name</name> + <value>data.osp</value> + </property> + <property> + <name>Real Time Server Name</name> + <value>localhost</value> + </property> + <property> + <name>Real Time Server Port</name> + <value>10000</value> + </property> + <property> + <name>Batch Server Name</name> + <value>localhost</value> + </property> + <property> + <name>Batch Server Port</name> + <value>10001</value> + </property> + <property> + <name>Iteration</name> + <value>true</value> + </property> + <property> + <name>Real Time Message ID</name> + <value>41</value> + </property> + <property> + <name>Batch Message ID</name> + <value>172,48</value> + </property> + <property> + <name>Real Time Interval</name> + <value>200 ms</value> + </property> + <property> + <name>Batch Time Interval</name> + <value>1 sec</value> + </property> + <property> + <name>Batch Max Buffer Size</name> + <value>262144</value> + </property> + <autoTerminatedRelationship>success</autoTerminatedRelationship> + </processor> + </rootGroup> + <controllerServices> + <controllerService> + <id>b2785fb0-e797-4c4d-8592-d2b2563504c4</id> + <name>DistributedMapCacheClientService</name> + <comment/> + <class>org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService</class> + <enabled>true</enabled> + <property> + <name>Server Hostname</name> + <value>localhost</value> + </property> + <property> + <name>Server Port</name> + <value>4557</value> + </property> + <property> + <name>SSL Context Service</name> + </property> + <property> + <name>Communications Timeout</name> + <value>30 secs</value> + </property> + </controllerService> + <controllerService> + <id>2855f1e0-dc35-4955-9ae2-b2d7d1765d4e</id> + <name>StandardSSLContextService</name> + <comment/> + <class>org.apache.nifi.ssl.StandardSSLContextService</class> + <enabled>true</enabled> + <property> + <name>Keystore Filename</name> + </property> + <property> + <name>Keystore Password</name> + </property> + <property> + <name>Keystore Type</name> + </property> + <property> + <name>Truststore Filename</name> + <value>/Library/Java/JavaVirtualMachines/jdk1.8.0_73.jdk/Contents/Home/jre/lib/security/cacerts</value> + </property> + <property> + <name>Truststore Password</name> + <value>enc{9E2EE146023A0F31914706460EB177B357796CF0C768DECE09D10C4B40F344C8}</value> + </property> + <property> + <name>Truststore Type</name> + <value>JKS</value> + </property> + <property> + <name>SSL Protocol</name> + <value>TLS</value> + </property> + </controllerService> + </controllerServices> + <reportingTasks/> +</flowController> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/target/conf/flow_Site2SiteServer.xml ---------------------------------------------------------------------- diff --git a/target/conf/flow_Site2SiteServer.xml b/target/conf/flow_Site2SiteServer.xml new file mode 100644 index 0000000..acd2c1e --- /dev/null +++ b/target/conf/flow_Site2SiteServer.xml @@ -0,0 +1,140 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + <processor> + <id>cd274fef-168a-486b-b21a-04ed17f981b7</id> + <name>LogAttribute</name> + <position x="2823.8107761867964" y="623.2524160253959"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.LogAttribute</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>Log Level</name> + <value>info</value> + </property> + <property> + <name>Log Payload</name> + <value>true</value> + </property> + <property> + <name>Attributes to Log</name> + </property> + <property> + <name>Attributes to Ignore</name> + </property> + <property> + <name>Log prefix</name> + </property> + <autoTerminatedRelationship>success</autoTerminatedRelationship> + </processor> + <processor> + <id>4fa35a7d-d1f0-44e4-87d7-7d69f0b78b7b</id> + <name>GenerateFlowFile</name> + <position x="2248.4411151522036" y="917.8589272756209"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.GenerateFlowFile</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>1 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>File Size</name> + <value>1024 kB</value> + </property> + <property> + <name>Batch Size</name> + <value>1</value> + </property> + <property> + <name>Data Format</name> + <value>Text</value> + </property> + <property> + <name>Unique FlowFiles</name> + <value>false</value> + </property> + </processor> + <inputPort> + <id>471deef6-2a6e-4a7d-912a-81cc17e3a204</id> + <name> From Node A</name> + <position x="2305.369919163486" y="646.0466623031645"/> + <comments/> + <scheduledState>RUNNING</scheduledState> + <maxConcurrentTasks>1</maxConcurrentTasks> + </inputPort> + <outputPort> + <id>75f88005-0a87-4fef-8320-6219cdbcf18b</id> + <name>To A</name> + <position x="2915.739181824911" y="1057.8803860295386"/> + <comments/> + <scheduledState>RUNNING</scheduledState> + <maxConcurrentTasks>1</maxConcurrentTasks> + </outputPort> + <label> + <id>2f0db43e-1ce0-49ab-96a5-459c285aff09</id> + <position x="2197.3693058093504" y="849.4395700448451"/> + <size height="286.5726013183594" width="1012.2957763671875"/> + <styles> + <style name="font-size">18px</style> + </styles> + <value>Generate Data that is pushed to Node A and made available to be pulled</value> + </label> + <connection> + <id>7f869898-3a93-4e28-a60c-064789870574</id> + <name/> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>471deef6-2a6e-4a7d-912a-81cc17e3a204</sourceId> + <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId> + <sourceType>INPUT_PORT</sourceType> + <destinationId>cd274fef-168a-486b-b21a-04ed17f981b7</destinationId> + <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId> + <destinationType>PROCESSOR</destinationType> + <relationship/> + <maxWorkQueueSize>0</maxWorkQueueSize> + <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> + <connection> + <id>9dbc73f6-c827-4258-8bc7-06eb6a9b79d5</id> + <name/> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>4fa35a7d-d1f0-44e4-87d7-7d69f0b78b7b</sourceId> + <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId> + <sourceType>PROCESSOR</sourceType> + <destinationId>75f88005-0a87-4fef-8320-6219cdbcf18b</destinationId> + <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId> + <destinationType>OUTPUT_PORT</destinationType> + <relationship>success</relationship> + <maxWorkQueueSize>0</maxWorkQueueSize> + <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> + </rootGroup> + <controllerServices/> + <reportingTasks/> +</flowController> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/target/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/target/conf/nifi.properties b/target/conf/nifi.properties index b1902b1..627876f 100644 --- a/target/conf/nifi.properties +++ b/target/conf/nifi.properties @@ -184,7 +184,7 @@ nifi.cluster.manager.safemode.duration=0 sec # kerberos # nifi.kerberos.krb5.file= -# Server +# MiNiFi Server for Command Control nifi.server.name=localhost nifi.server.port=9000 nifi.server.report.interval=1000 ms http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e170f7aa/thirdparty/uuid/tst_uuid ---------------------------------------------------------------------- diff --git a/thirdparty/uuid/tst_uuid b/thirdparty/uuid/tst_uuid index cce0cbd..e067cb2 100755 Binary files a/thirdparty/uuid/tst_uuid and b/thirdparty/uuid/tst_uuid differ
