Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 3676468fc -> be3f2ffea
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/ListenSyslog.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ListenSyslog.cpp b/libminifi/src/processors/ListenSyslog.cpp index 2dd223c..e7d2e7b 100644 --- a/libminifi/src/processors/ListenSyslog.cpp +++ b/libminifi/src/processors/ListenSyslog.cpp @@ -17,12 +17,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include <queue> +#include "processors/ListenSyslog.h" #include <stdio.h> +#include <memory> #include <string> +#include <vector> +#include <set> +#include <queue> #include "utils/TimeUtil.h" #include "utils/StringUtils.h" -#include "processors/ListenSyslog.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" @@ -32,7 +35,6 @@ namespace nifi { namespace minifi { namespace processors { -const std::string ListenSyslog::ProcessorName("ListenSyslog"); core::Property ListenSyslog::RecvBufSize( "Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B"); @@ -48,20 +50,22 @@ core::Property ListenSyslog::MaxBatchSize( "The maximum number of Syslog events to add to a single FlowFile.", "1"); core::Property ListenSyslog::MessageDelimiter( "Message Delimiter", - "Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> core::Property).", + "Specifies the delimiter to place between Syslog messages when multiple " + "messages are bundled together (see <Max Batch Size> core::Property).", "\n"); core::Property ListenSyslog::ParseMessages( "Parse Messages", "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false"); -core::Property ListenSyslog::Protocol( - "Protocol", "The protocol for Syslog communication.", "UDP"); -core::Property ListenSyslog::Port( - "Port", "The port for Syslog communication.", "514"); -core::Relationship ListenSyslog::Success( - "success", "All files are routed to success"); -core::Relationship ListenSyslog::Invalid( - "invalid", "SysLog message format invalid"); +core::Property ListenSyslog::Protocol("Protocol", + "The protocol for Syslog communication.", + "UDP"); +core::Property ListenSyslog::Port("Port", "The port for Syslog communication.", + "514"); +core::Relationship ListenSyslog::Success("success", + "All files are routed to success"); +core::Relationship ListenSyslog::Invalid("invalid", + "SysLog message format invalid"); void ListenSyslog::initialize() { // Set the supported properties @@ -125,7 +129,7 @@ void ListenSyslog::runThread() { logger_->log_info("ListenSysLog Server socket creation failed"); break; } - bzero((char *) &serv_addr, sizeof(serv_addr)); + bzero(reinterpret_cast<char *>(&serv_addr), sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = INADDR_ANY; serv_addr.sin_port = htons(portno); @@ -167,7 +171,8 @@ void ListenSyslog::runThread() { socklen_t clilen; struct sockaddr_in cli_addr; clilen = sizeof(cli_addr); - int newsockfd = accept(_serverSocket, (struct sockaddr *) &cli_addr, + int newsockfd = accept(_serverSocket, + reinterpret_cast<struct sockaddr *>(&cli_addr), &clilen); if (newsockfd > 0) { if (_clientSockets.size() < _maxConnections) { @@ -197,7 +202,7 @@ void ListenSyslog::runThread() { while (it != _clientSockets.end()) { int clientSocket = *it; if (FD_ISSET(clientSocket, &fds)) { - int recvlen = readline(clientSocket, (char *) _buffer, sizeof(_buffer)); + int recvlen = readline(clientSocket, _buffer, sizeof(_buffer)); if (recvlen <= 0) { close(clientSocket); logger_->log_info("ListenSysLog client socket %d close", @@ -228,7 +233,7 @@ int ListenSyslog::readline(int fd, char *bufptr, size_t len) { if (--cnt <= 0) { cnt = recv(fd, b, sizeof(b), 0); if (cnt < 0) { - if ( errno == EINTR) { + if (errno == EINTR) { len++; /* the while will decrement */ continue; } @@ -248,9 +253,8 @@ int ListenSyslog::readline(int fd, char *bufptr, size_t len) { return -1; } -void ListenSyslog::onTrigger( - core::ProcessContext *context, - core::ProcessSession *session) { +void ListenSyslog::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { std::string value; bool needResetServerSocket = false; if (context->getProperty(Protocol.getName(), value)) { @@ -262,12 +266,10 @@ void ListenSyslog::onTrigger( core::Property::StringToInt(value, _recvBufSize); } if (context->getProperty(MaxSocketBufSize.getName(), value)) { - core::Property::StringToInt(value, - _maxSocketBufSize); + core::Property::StringToInt(value, _maxSocketBufSize); } if (context->getProperty(MaxConnections.getName(), value)) { - core::Property::StringToInt(value, - _maxConnections); + core::Property::StringToInt(value, _maxConnections); } if (context->getProperty(MessageDelimiter.getName(), value)) { _messageDelimiter = value; @@ -283,8 +285,7 @@ void ListenSyslog::onTrigger( needResetServerSocket = true; } if (context->getProperty(MaxBatchSize.getName(), value)) { - core::Property::StringToInt(value, - _maxBatchSize); + core::Property::StringToInt(value, _maxBatchSize); } if (needResetServerSocket) @@ -309,12 +310,12 @@ void ListenSyslog::onTrigger( flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); if (!flowFile) return; - ListenSyslog::WriteCallback callback((char *) event.payload, event.len); + ListenSyslog::WriteCallback callback(event.payload, event.len); session->write(flowFile, &callback); delete[] event.payload; firstEvent = false; } else { - ListenSyslog::WriteCallback callback((char *) event.payload, event.len); + ListenSyslog::WriteCallback callback(event.payload, event.len); session->append(flowFile, &callback); delete[] event.payload; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/LogAttribute.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/LogAttribute.cpp b/libminifi/src/processors/LogAttribute.cpp index e2cf16c..d2dcd10 100644 --- a/libminifi/src/processors/LogAttribute.cpp +++ b/libminifi/src/processors/LogAttribute.cpp @@ -17,19 +17,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "processors/LogAttribute.h" +#include <sys/time.h> +#include <time.h> +#include <string.h> +#include <memory> +#include <string> #include <vector> #include <queue> #include <map> #include <set> -#include <sys/time.h> -#include <time.h> #include <sstream> -#include <string.h> #include <iostream> - #include "utils/TimeUtil.h" #include "utils/StringUtils.h" -#include "processors/LogAttribute.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" @@ -38,7 +39,6 @@ namespace apache { namespace nifi { namespace minifi { namespace processors { -const std::string LogAttribute::ProcessorName("LogAttribute"); core::Property LogAttribute::LogLevel( "Log Level", "The Log Level to use when logging the Attributes", "info"); core::Property LogAttribute::AttributesToLog( @@ -51,7 +51,8 @@ core::Property LogAttribute::AttributesToIgnore( ""); core::Property LogAttribute::LogPayload( "Log Payload", - "If true, the FlowFile's payload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.", + "If true, the FlowFile's payload will be logged, in addition to its attributes;" + "otherwise, just the Attributes will be logged.", "false"); core::Property LogAttribute::LogPrefix( "Log prefix", @@ -75,16 +76,14 @@ void LogAttribute::initialize() { setSupportedRelationships(relationships); } -void LogAttribute::onTrigger( - core::ProcessContext *context, - core::ProcessSession *session) { +void LogAttribute::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { std::string dashLine = "--------------------------------------------------"; LogAttrLevel level = LogAttrLevelInfo; bool logPayload = false; std::ostringstream message; - std::shared_ptr<core::FlowFile> flow = - session->get(); + std::shared_ptr<core::FlowFile> flow = session->get(); if (!flow) return; @@ -97,7 +96,8 @@ void LogAttribute::onTrigger( dashLine = "-----" + value + "-----"; } if (context->getProperty(LogPayload.getName(), value)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, logPayload); + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, + logPayload); } message << "Logging for flow file " << "\n"; @@ -125,9 +125,7 @@ void LogAttribute::onTrigger( ReadCallback callback(flow->getSize()); session->read(flow, &callback); for (unsigned int i = 0, j = 0; i < callback._readSize; i++) { - char temp[8]; - sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i])); - message << temp; + message << std::hex << callback._buffer[i]; j++; if (j == 16) { message << '\n'; @@ -168,7 +166,6 @@ void LogAttribute::onTrigger( session->transfer(flow, Success); } - } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/PutFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/PutFile.cpp b/libminifi/src/processors/PutFile.cpp index 51fbb6f..bd08877 100644 --- a/libminifi/src/processors/PutFile.cpp +++ b/libminifi/src/processors/PutFile.cpp @@ -17,17 +17,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include <sstream> +#include "processors/PutFile.h" #include <stdio.h> +#include <uuid/uuid.h> +#include <sstream> #include <string> #include <iostream> +#include <memory> +#include <set> #include <fstream> -#include <uuid/uuid.h> - #include "io/validation.h" #include "utils/StringUtils.h" #include "utils/TimeUtil.h" -#include "processors/PutFile.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" @@ -37,12 +38,6 @@ namespace nifi { namespace minifi { namespace processors { -const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE("replace"); -const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_IGNORE("ignore"); -const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail"); - -const std::string PutFile::ProcessorName("PutFile"); - core::Property PutFile::Directory("Output Directory", "The output directory to which to put files", "."); @@ -72,7 +67,6 @@ void PutFile::initialize() { void PutFile::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { - if (!context->getProperty(Directory.getName(), directory_)) { logger_->log_error("Directory attribute is missing or invalid"); } @@ -82,11 +76,10 @@ void PutFile::onSchedule(core::ProcessContext *context, logger_->log_error( "Conflict Resolution Strategy attribute is missing or invalid"); } - } + void PutFile::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - if (IsNullOrEmpty(directory_) || IsNullOrEmpty(conflict_resolution_)) { context->yield(); return; @@ -144,7 +137,6 @@ void PutFile::onTrigger(core::ProcessContext *context, bool PutFile::putFile(core::ProcessSession *session, std::shared_ptr<FlowFileRecord> flowFile, const std::string &tmpFile, const std::string &destFile) { - ReadCallback cb(tmpFile, destFile); session->read(flowFile, &cb); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/RealTimeDataCollector.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/RealTimeDataCollector.cpp b/libminifi/src/processors/RealTimeDataCollector.cpp deleted file mode 100644 index 922835d..0000000 --- a/libminifi/src/processors/RealTimeDataCollector.cpp +++ /dev/null @@ -1,480 +0,0 @@ -/** - * @file RealTimeDataCollector.cpp - * RealTimeDataCollector class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <time.h> -#include <chrono> -#include <thread> -#include <memory> -#include <random> -#include <netinet/tcp.h> - -#include "utils/StringUtils.h" -#include "processors/RealTimeDataCollector.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace processors { -const std::string RealTimeDataCollector::ProcessorName("RealTimeDataCollector"); -core::Property RealTimeDataCollector::FILENAME( - "File Name", "File Name for the real time processor to process", - "data.osp"); -core::Property RealTimeDataCollector::REALTIMESERVERNAME( - "Real Time Server Name", "Real Time Server Name", "localhost"); -core::Property RealTimeDataCollector::REALTIMESERVERPORT( - "Real Time Server Port", "Real Time Server Port", "10000"); -core::Property RealTimeDataCollector::BATCHSERVERNAME( - "Batch Server Name", "Batch Server Name", "localhost"); -core::Property RealTimeDataCollector::BATCHSERVERPORT( - "Batch Server Port", "Batch Server Port", "10001"); -core::Property RealTimeDataCollector::ITERATION( - "Iteration", "If true, sample osp file will be iterated", "true"); -core::Property RealTimeDataCollector::REALTIMEMSGID( - "Real Time Message ID", "Real Time Message ID", "41"); -core::Property RealTimeDataCollector::BATCHMSGID( - "Batch Message ID", "Batch Message ID", "172, 30, 48"); -core::Property RealTimeDataCollector::REALTIMEINTERVAL( - "Real Time Interval", "Real Time Data Collection Interval in msec", - "10 ms"); -core::Property RealTimeDataCollector::BATCHINTERVAL( - "Batch Time Interval", "Batch Processing Interval in msec", "100 ms"); -core::Property RealTimeDataCollector::BATCHMAXBUFFERSIZE( - "Batch Max Buffer Size", "Batch Buffer Maximum size in bytes", "262144"); -core::Relationship RealTimeDataCollector::Success( - "success", "success operational on the flow record"); - -void RealTimeDataCollector::initialize() { - // Set the supported properties - std::set<core::Property> properties; - properties.insert(FILENAME); - properties.insert(REALTIMESERVERNAME); - properties.insert(REALTIMESERVERPORT); - properties.insert(BATCHSERVERNAME); - properties.insert(BATCHSERVERPORT); - properties.insert(ITERATION); - properties.insert(REALTIMEMSGID); - properties.insert(BATCHMSGID); - properties.insert(REALTIMEINTERVAL); - properties.insert(BATCHINTERVAL); - properties.insert(BATCHMAXBUFFERSIZE); - - setSupportedProperties(properties); - // Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); - -} - -int RealTimeDataCollector::connectServer(const char *host, uint16_t port) { - in_addr_t addr; - int sock = 0; - struct hostent *h; -#ifdef __MACH__ - h = gethostbyname(host); -#else - char buf[1024]; - struct hostent he; - int hh_errno; - gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); -#endif - memcpy((char *) &addr, h->h_addr_list[0], h->h_length); - sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) { - logger_->log_error("Could not create socket to hostName %s", host); - return 0; - } - -#ifndef __MACH__ - int opt = 1; - bool nagle_off = true; - - if (nagle_off) - { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) - { - logger_->log_error("setsockopt() TCP_NODELAY failed"); - close(sock); - return 0; - } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, - (char *)&opt, sizeof(opt)) < 0) - { - logger_->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return 0; - } - } - - int sndsize = 256*1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) - { - logger_->log_error("setsockopt() SO_SNDBUF failed"); - close(sock); - return 0; - } -#endif - - struct sockaddr_in sa; - socklen_t socklen; - int status; - - //TODO bind socket to the interface - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = htonl(INADDR_ANY); - sa.sin_port = htons(0); - socklen = sizeof(sa); - if (bind(sock, (struct sockaddr *) &sa, socklen) < 0) { - logger_->log_error("socket bind failed"); - close(sock); - return 0; - } - - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = addr; - sa.sin_port = htons(port); - socklen = sizeof(sa); - - status = connect(sock, (struct sockaddr *) &sa, socklen); - - if (status < 0) { - logger_->log_error("socket connect failed to %s %d", host, port); - close(sock); - return 0; - } - - logger_->log_info("socket %d connect to server %s port %d success", sock, - host, port); - - return sock; -} - -int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen) { - int ret = 0, bytes = 0; - - while (bytes < buflen) { - ret = send(socket, buf + bytes, buflen - bytes, 0); - //check for errors - if (ret == -1) { - return ret; - } - bytes += ret; - } - - if (ret) - logger_->log_debug("Send data size %d over socket %d", buflen, socket); - - return ret; -} - -void RealTimeDataCollector::onTriggerRealTime( - core::ProcessContext *context, - core::ProcessSession *session) { - if (_realTimeAccumulated >= this->_realTimeInterval) { - std::string value; - if (this->getProperty(REALTIMEMSGID.getName(), value)) { - this->_realTimeMsgID.clear(); - this->logger_->log_info("Real Time Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while (std::getline(lineStream, cell, ',')) { - this->_realTimeMsgID.push_back(cell); - // this->logger_->log_debug("Real Time Msg ID %s", cell.c_str()); - } - } - if (this->getProperty(BATCHMSGID.getName(), value)) { - this->_batchMsgID.clear(); - this->logger_->log_info("Batch Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while (std::getline(lineStream, cell, ',')) { - cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell); - this->_batchMsgID.push_back(cell); - } - } - // Open the file - if (!this->_fileStream.is_open()) { - _fileStream.open(this->_fileName.c_str(), std::ifstream::in); - if (this->_fileStream.is_open()) - logger_->log_debug("open %s", _fileName.c_str()); - } - if (!_fileStream.good()) { - logger_->log_error("load data file failed %s", _fileName.c_str()); - return; - } - if (this->_fileStream.is_open()) { - std::string line; - - while (std::getline(_fileStream, line)) { - line += "\n"; - std::stringstream lineStream(line); - std::string cell; - if (std::getline(lineStream, cell, ',')) { - cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell); - // Check whether it match to the batch traffic - for (std::vector<std::string>::iterator it = _batchMsgID.begin(); - it != _batchMsgID.end(); ++it) { - if (cell == *it) { - // push the batch data to the queue - std::lock_guard<std::mutex> lock(mutex_); - while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) { - std::string item = _queue.front(); - _queuedDataSize -= item.size(); - logger_->log_debug( - "Pop item size %d from batch queue, queue buffer size %d", - item.size(), _queuedDataSize); - _queue.pop(); - } - _queue.push(line); - _queuedDataSize += line.size(); - logger_->log_debug( - "Push batch msg ID %s into batch queue, queue buffer size %d", - cell.c_str(), _queuedDataSize); - } - } - bool findRealTime = false; - // Check whether it match to the real time traffic - for (std::vector<std::string>::iterator it = _realTimeMsgID.begin(); - it != _realTimeMsgID.end(); ++it) { - if (cell == *it) { - int status = 0; - if (this->_realTimeSocket <= 0) { - // Connect the LTE socket - uint16_t port = _realTimeServerPort; - this->_realTimeSocket = connectServer( - _realTimeServerName.c_str(), port); - } - if (this->_realTimeSocket) { - // try to send the data - status = sendData(_realTimeSocket, line.data(), line.size()); - if (status < 0) { - close(_realTimeSocket); - _realTimeSocket = 0; - } - } - if (this->_realTimeSocket <= 0 || status < 0) { - // push the batch data to the queue - std::lock_guard<std::mutex> lock(mutex_); - while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) { - std::string item = _queue.front(); - _queuedDataSize -= item.size(); - logger_->log_debug( - "Pop item size %d from batch queue, queue buffer size %d", - item.size(), _queuedDataSize); - _queue.pop(); - } - _queue.push(line); - _queuedDataSize += line.size(); - logger_->log_debug( - "Push real time msg ID %s into batch queue, queue buffer size %d", - cell.c_str(), _queuedDataSize); - } - // find real time - findRealTime = true; - } // cell - } // for real time pattern - if (findRealTime) - // we break the while once we find the first real time - break; - } // if get line - } // while - if (_fileStream.eof()) { - _fileStream.close(); - } - } // if open - _realTimeAccumulated = 0; - } - std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>( - context->getProcessorNode().getProcessor()); - _realTimeAccumulated += processor->getSchedulingPeriodNano(); -} - -void RealTimeDataCollector::onTriggerBatch( - core::ProcessContext *context, - core::ProcessSession *session) { - if (_batchAcccumulated >= this->_batchInterval) { - // logger_->log_info("onTriggerBatch"); - // dequeue the batch and send over WIFI - int status = 0; - if (this->_batchSocket <= 0) { - // Connect the WIFI socket - uint16_t port = _batchServerPort; - this->_batchSocket = connectServer(_batchServerName.c_str(), port); - } - if (this->_batchSocket) { - std::lock_guard<std::mutex> lock(mutex_); - - while (!_queue.empty()) { - std::string line = _queue.front(); - status = sendData(_batchSocket, line.data(), line.size()); - _queue.pop(); - _queuedDataSize -= line.size(); - if (status < 0) { - close(_batchSocket); - _batchSocket = 0; - break; - } - } - } - _batchAcccumulated = 0; - } - std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>( - context->getProcessorNode().getProcessor()); - _batchAcccumulated += processor->getSchedulingPeriodNano(); -} - -void RealTimeDataCollector::onTrigger( - core::ProcessContext *context, - core::ProcessSession *session) { - std::thread::id id = std::this_thread::get_id(); - - if (id == _realTimeThreadId) - return onTriggerRealTime(context, session); - else if (id == _batchThreadId) - return onTriggerBatch(context, session); - else { - std::lock_guard<std::mutex> lock(mutex_); - if (!this->_firstInvoking) { - this->_fileName = "data.osp"; - std::string value; - if (this->getProperty(FILENAME.getName(), value)) { - this->_fileName = value; - this->logger_->log_info("Data Collector File Name %s", - _fileName.c_str()); - } - this->_realTimeServerName = "localhost"; - if (this->getProperty(REALTIMESERVERNAME.getName(), value)) { - this->_realTimeServerName = value; - this->logger_->log_info("Real Time Server Name %s", - this->_realTimeServerName.c_str()); - } - this->_realTimeServerPort = 10000; - if (this->getProperty(REALTIMESERVERPORT.getName(), value)) { - core::Property::StringToInt( - value, _realTimeServerPort); - this->logger_->log_info("Real Time Server Port %d", - _realTimeServerPort); - } - if (this->getProperty(BATCHSERVERNAME.getName(), value)) { - this->_batchServerName = value; - this->logger_->log_info("Batch Server Name %s", - this->_batchServerName.c_str()); - } - this->_batchServerPort = 10001; - if (this->getProperty(BATCHSERVERPORT.getName(), value)) { - core::Property::StringToInt( - value, _batchServerPort); - this->logger_->log_info("Batch Server Port %d", _batchServerPort); - } - if (this->getProperty(ITERATION.getName(), value)) { - org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, this->_iteration); - logger_->log_info("Iteration %d", _iteration); - } - this->_realTimeInterval = 10000000; //10 msec - if (this->getProperty(REALTIMEINTERVAL.getName(), value)) { - core::TimeUnit unit; - if (core::Property::StringToTime( - value, _realTimeInterval, unit) - && core::Property::ConvertTimeUnitToNS( - _realTimeInterval, unit, _realTimeInterval)) { - logger_->log_info("Real Time Interval: [%d] ns", _realTimeInterval); - } - } - this->_batchInterval = 100000000; //100 msec - if (this->getProperty(BATCHINTERVAL.getName(), value)) { - core::TimeUnit unit; - if (core::Property::StringToTime( - value, _batchInterval, unit) - && core::Property::ConvertTimeUnitToNS( - _batchInterval, unit, _batchInterval)) { - logger_->log_info("Batch Time Interval: [%d] ns", _batchInterval); - } - } - this->_batchMaxBufferSize = 256 * 1024; - if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), value)) { - core::Property::StringToInt( - value, _batchMaxBufferSize); - this->logger_->log_info("Batch Max Buffer Size %d", - _batchMaxBufferSize); - } - if (this->getProperty(REALTIMEMSGID.getName(), value)) { - this->logger_->log_info("Real Time Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while (std::getline(lineStream, cell, ',')) { - this->_realTimeMsgID.push_back(cell); - this->logger_->log_info("Real Time Msg ID %s", cell.c_str()); - } - } - if (this->getProperty(BATCHMSGID.getName(), value)) { - this->logger_->log_info("Batch Msg IDs %s", value.c_str()); - std::stringstream lineStream(value); - std::string cell; - - while (std::getline(lineStream, cell, ',')) { - cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell); - this->_batchMsgID.push_back(cell); - this->logger_->log_info("Batch Msg ID %s", cell.c_str()); - } - } - // Connect the LTE socket - uint16_t port = _realTimeServerPort; - - this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port); - - // Connect the WIFI socket - port = _batchServerPort; - - this->_batchSocket = connectServer(_batchServerName.c_str(), port); - - // Open the file - _fileStream.open(this->_fileName.c_str(), std::ifstream::in); - if (!_fileStream.good()) { - logger_->log_error("load data file failed %s", _fileName.c_str()); - return; - } else { - logger_->log_debug("open %s", _fileName.c_str()); - } - _realTimeThreadId = id; - this->_firstInvoking = true; - } else { - if (id != _realTimeThreadId) - _batchThreadId = id; - this->_firstInvoking = false; - } - } -} -} /* namespace processors */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/TailFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/TailFile.cpp b/libminifi/src/processors/TailFile.cpp index bcdd8fd..abb02ca 100644 --- a/libminifi/src/processors/TailFile.cpp +++ b/libminifi/src/processors/TailFile.cpp @@ -17,22 +17,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include <vector> -#include <queue> -#include <map> -#include <set> #include <sys/time.h> #include <sys/types.h> #include <sys/stat.h> #include <time.h> -#include <sstream> #include <stdio.h> -#include <string> -#include <iostream> #include <dirent.h> #include <limits.h> #include <unistd.h> - +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <memory> +#include <algorithm> +#include <sstream> +#include <string> +#include <iostream> #include "utils/TimeUtil.h" #include "utils/StringUtils.h" #include "processors/TailFile.h" @@ -45,16 +46,16 @@ namespace nifi { namespace minifi { namespace processors { -const std::string TailFile::ProcessorName("TailFile"); core::Property TailFile::FileName( "File to Tail", "Fully-qualified filename of the file that should be tailed", ""); core::Property TailFile::StateFile( "State File", - "Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off", + "Specifies the file that should be used for storing state about" + " what data has been ingested so that upon restart NiFi can resume from where it left off", "TailFileState"); -core::Relationship TailFile::Success( - "success", "All files are routed to success"); +core::Relationship TailFile::Success("success", + "All files are routed to success"); void TailFile::initialize() { // Set the supported properties @@ -123,9 +124,9 @@ void TailFile::recoverState() { logger_->log_error("load state file failed %s", _stateFile.c_str()); return; } - const unsigned int bufSize = 512; - char buf[bufSize]; - for (file.getline(buf, bufSize); file.good(); file.getline(buf, bufSize)) { + char buf[BUFFER_SIZE]; + for (file.getline(buf, BUFFER_SIZE); file.good(); + file.getline(buf, BUFFER_SIZE)) { parseStateFileLine(buf); } } @@ -145,7 +146,8 @@ static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) { return (i.modifiedTime < j.modifiedTime); } -void TailFile::checkRollOver(const std::string &fileLocation, const std::string &fileName) { +void TailFile::checkRollOver(const std::string &fileLocation, + const std::string &fileName) { struct stat statbuf; std::vector<TailMatchedFileItem> matchedFiles; std::string fullPath = fileLocation + "/" + _currentTailFileName; @@ -208,18 +210,17 @@ void TailFile::checkRollOver(const std::string &fileLocation, const std::string break; } } - } else + } else { return; + } } -void TailFile::onTrigger( - core::ProcessContext *context, - core::ProcessSession *session) { - +void TailFile::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { std::lock_guard<std::mutex> tail_lock(tail_file_mutex_); std::string value; - std::string fileLocation=""; - std::string fileName=""; + std::string fileLocation = ""; + std::string fileName = ""; if (context->getProperty(FileName.getName(), value)) { std::size_t found = value.find_last_of("/\\"); fileLocation = value.substr(0, found); @@ -235,17 +236,17 @@ void TailFile::onTrigger( // recover the state if we have not done so this->recoverState(); } - checkRollOver(fileLocation,fileName); + checkRollOver(fileLocation, fileName); std::string fullPath = fileLocation + "/" + _currentTailFileName; struct stat statbuf; if (stat(fullPath.c_str(), &statbuf) == 0) { - if (statbuf.st_size <= this->_currentTailFilePosition) - // there are no new input for the current tail file - { + if (statbuf.st_size <= this->_currentTailFilePosition) { + // there are no new input for the current tail fil context->yield(); return; } - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< + FlowFileRecord>(session->create()); if (!flowFile) return; std::size_t found = _currentTailFileName.find_last_of("."); @@ -267,7 +268,6 @@ void TailFile::onTrigger( } } - } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/provenance/Provenance.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp index 289f026..b1db9a8 100644 --- a/libminifi/src/provenance/Provenance.cpp +++ b/libminifi/src/provenance/Provenance.cpp @@ -15,13 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +#include <arpa/inet.h> #include <cstdint> +#include <memory> +#include <string> #include <vector> -#include <arpa/inet.h> #include "io/DataStream.h" #include "io/Serializable.h" #include "provenance/Provenance.h" - #include "core/logging/Logger.h" #include "core/Relationship.h" #include "FlowController.h" @@ -44,9 +46,10 @@ bool ProvenanceEventRecord::DeSerialize( logger_->log_error("NiFi Provenance Store event %s can not found", key.c_str()); return false; - } else + } else { logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(), value.length()); + } org::apache::nifi::minifi::io::DataStream stream( (const uint8_t*) value.data(), value.length()); @@ -75,20 +78,17 @@ bool ProvenanceEventRecord::Serialize( ret = writeUTF(this->_eventIdStr, &outStream); if (ret <= 0) { - return false; } uint32_t eventType = this->_eventType; ret = write(eventType, &outStream); if (ret != 4) { - return false; } ret = write(this->_eventTime, &outStream); if (ret != 8) { - return false; } @@ -99,37 +99,31 @@ bool ProvenanceEventRecord::Serialize( ret = write(this->_eventDuration, &outStream); if (ret != 8) { - return false; } ret = write(this->_lineageStartDate, &outStream); if (ret != 8) { - return false; } ret = writeUTF(this->_componentId, &outStream); if (ret <= 0) { - return false; } ret = writeUTF(this->_componentType, &outStream); if (ret <= 0) { - return false; } ret = writeUTF(this->uuid_, &outStream); if (ret <= 0) { - return false; } ret = writeUTF(this->_details, &outStream); if (ret <= 0) { - return false; } @@ -137,44 +131,37 @@ bool ProvenanceEventRecord::Serialize( uint32_t numAttributes = this->_attributes.size(); ret = write(numAttributes, &outStream); if (ret != 4) { - return false; } for (auto itAttribute : _attributes) { ret = writeUTF(itAttribute.first, &outStream, true); if (ret <= 0) { - return false; } ret = writeUTF(itAttribute.second, &outStream, true); if (ret <= 0) { - return false; } } ret = writeUTF(this->_contentFullPath, &outStream); if (ret <= 0) { - return false; } ret = write(this->_size, &outStream); if (ret != 8) { - return false; } ret = write(this->_offset, &outStream); if (ret != 8) { - return false; } ret = writeUTF(this->_sourceQueueIdentifier, &outStream); if (ret <= 0) { - return false; } @@ -185,13 +172,11 @@ bool ProvenanceEventRecord::Serialize( uint32_t number = this->_parentUuids.size(); ret = write(number, &outStream); if (ret != 4) { - return false; } for (auto parentUUID : _parentUuids) { ret = writeUTF(parentUUID, &outStream); if (ret <= 0) { - return false; } } @@ -203,7 +188,6 @@ bool ProvenanceEventRecord::Serialize( for (auto childUUID : _childrenUuids) { ret = writeUTF(childUUID, &outStream); if (ret <= 0) { - return false; } } @@ -211,24 +195,19 @@ bool ProvenanceEventRecord::Serialize( || this->_eventType == ProvenanceEventRecord::FETCH) { ret = writeUTF(this->_transitUri, &outStream); if (ret <= 0) { - return false; } } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) { ret = writeUTF(this->_transitUri, &outStream); if (ret <= 0) { - return false; } ret = writeUTF(this->_sourceSystemFlowFileIdentifier, &outStream); if (ret <= 0) { - return false; } } - // Persistent to the DB - if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) { logger_->log_debug("NiFi Provenance Store event %s size %d success", @@ -237,16 +216,11 @@ bool ProvenanceEventRecord::Serialize( logger_->log_error("NiFi Provenance Store event %s size %d fail", _eventIdStr.c_str(), outStream.getSize()); } - - - // cleanup - return true; } bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) { - int ret; org::apache::nifi::minifi::io::DataStream outStream(buffer, bufferSize); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/provenance/ProvenanceRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp b/libminifi/src/provenance/ProvenanceRepository.cpp index 6fe332b..77de5ba 100644 --- a/libminifi/src/provenance/ProvenanceRepository.cpp +++ b/libminifi/src/provenance/ProvenanceRepository.cpp @@ -16,22 +16,21 @@ * limitations under the License. */ -#include "provenance/Provenance.h" #include "provenance/ProvenanceRepository.h" - +#include <string> +#include <vector> +#include "provenance/Provenance.h" namespace org { namespace apache { namespace nifi { namespace minifi { -namespace provenance{ - - +namespace provenance { void ProvenanceRepository::run() { // threshold for purge uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; while (running_) { - std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); + std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); uint64_t curTime = getTimeMillis(); uint64_t size = repoSize(); @@ -41,24 +40,24 @@ void ProvenanceRepository::run() { for (it->SeekToFirst(); it->Valid(); it->Next()) { ProvenanceEventRecord eventRead; std::string key = it->key().ToString(); - if (eventRead.DeSerialize((uint8_t *) it->value().data(), - (int) it->value().size())) { - if ((curTime - eventRead.getEventTime()) - > max_partition_millis_) + if (eventRead.DeSerialize( + reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())), + it->value().size())) { + if ((curTime - eventRead.getEventTime()) > max_partition_millis_) purgeList.push_back(key); } else { logger_->log_debug("NiFi Provenance retrieve event %s fail", - key.c_str()); + key.c_str()); purgeList.push_back(key); } } delete it; std::vector<std::string>::iterator itPurge; - + for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) { std::string eventId = *itPurge; logger_->log_info("ProvenanceRepository Repo Purge %s", - eventId.c_str()); + eventId.c_str()); Delete(eventId); } } @@ -66,7 +65,6 @@ void ProvenanceRepository::run() { repo_full_ = true; else repo_full_ = false; - } return; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/TestBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 4f926d3..585c8cd 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -26,7 +26,7 @@ #include <vector> #include "core/logging/LogAppenders.h" #include "core/logging/Logger.h" -#include "core/core.h" +#include "core/Core.h" class LogTestController { public: @@ -48,7 +48,7 @@ class TestController { TestController() : log("info") { - minifi::ResourceClaim::default_directory_path = "./"; + minifi::ResourceClaim::default_directory_path = const_cast<char*>("./"); } ~TestController() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/nodefs/NoLevelDB.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/nodefs/NoLevelDB.cpp b/libminifi/test/nodefs/NoLevelDB.cpp index 00c9212..09b4916 100644 --- a/libminifi/test/nodefs/NoLevelDB.cpp +++ b/libminifi/test/nodefs/NoLevelDB.cpp @@ -18,7 +18,7 @@ #include "../TestBase.h" -#include "core/core.h" +#include "core/Core.h" #include "core/RepositoryFactory.h" TEST_CASE("NoLevelDBTest1", "[NoLevelDBTest]") { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/nodefs/NoYamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/nodefs/NoYamlConfiguration.cpp b/libminifi/test/nodefs/NoYamlConfiguration.cpp index 9a9b10e..5f3fce4 100644 --- a/libminifi/test/nodefs/NoYamlConfiguration.cpp +++ b/libminifi/test/nodefs/NoYamlConfiguration.cpp @@ -17,7 +17,7 @@ */ -#include "core/core.h" +#include "core/Core.h" #include "core/RepositoryFactory.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/unit/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index 91a55f7..87f190c 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -23,7 +23,7 @@ #include "core/logging/LogAppenders.h" #include "core/logging/BaseLogger.h" #include "processors/GetFile.h" -#include "core/core.h" +#include "core/Core.h" #include "../../include/core/FlowFile.h" #include "core/Processor.h" #include "core/ProcessContext.h" @@ -189,6 +189,7 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { processor->setScheduledState(core::ScheduledState::RUNNING); processor->onSchedule(&context, &factory); + int prev = 0; for (int i = 0; i < 10; i++) { core::ProcessSession session(&context); @@ -216,7 +217,6 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { processor->setScheduledState(core::ScheduledState::RUNNING); processor->onTrigger(&context, &session); unlink(ss.str().c_str()); - rmdir(dir); reporter = session.getProvenanceReporter(); REQUIRE(processor->getName() == "getfileCreate2"); @@ -229,14 +229,10 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { session.commit(); std::shared_ptr<core::FlowFile> ffr = session.get(); - REQUIRE(2 == repo->getRepoMap().size()); + REQUIRE((repo->getRepoMap().size()%2) == 0); + REQUIRE(repo->getRepoMap().size() == (prev+2)); + prev+=2; - for (auto entry : repo->getRepoMap()) { - provenance::ProvenanceEventRecord newRecord; - newRecord.DeSerialize((uint8_t*) entry.second.data(), - entry.second.length()); - - } } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 1e16aa6..80d8642 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -21,7 +21,7 @@ #include "provenance/Provenance.h" #include "FlowController.h" #include "core/Repository.h" -#include "core/core.h" +#include "core/Core.h" /** * Test repository */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/unit/ProvenanceTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp index c73cef2..2e41cc8 100644 --- a/libminifi/test/unit/ProvenanceTests.cpp +++ b/libminifi/test/unit/ProvenanceTests.cpp @@ -21,7 +21,7 @@ #include "ProvenanceTestHelper.h" #include "provenance/Provenance.h" #include "FlowFileRecord.h" -#include "core/core.h" +#include "core/Core.h" #include "core/repository/FlowFileRepository.h" TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/unit/RepoTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp index 9237e7e..21fae45 100644 --- a/libminifi/test/unit/RepoTests.cpp +++ b/libminifi/test/unit/RepoTests.cpp @@ -21,7 +21,7 @@ #include "ProvenanceTestHelper.h" #include "provenance/Provenance.h" #include "FlowFileRecord.h" -#include "core/core.h" +#include "core/Core.h" #include "core/repository/FlowFileRepository.h" TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/main/MiNiFiMain.cpp ---------------------------------------------------------------------- diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 6bfd9c9..9e6a37f 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -28,7 +28,7 @@ #include <yaml-cpp/yaml.h> #include <iostream> -#include "core/core.h" +#include "core/Core.h" #include "core/logging/BaseLogger.h" #include "core/logging/LogAppenders.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/thirdparty/google-styleguide/cpplint.py ---------------------------------------------------------------------- diff --git a/thirdparty/google-styleguide/cpplint.py b/thirdparty/google-styleguide/cpplint.py index fafc243..eda78bd 100644 --- a/thirdparty/google-styleguide/cpplint.py +++ b/thirdparty/google-styleguide/cpplint.py @@ -5766,14 +5766,8 @@ def FlagCxx11Features(filename, clean_lines, linenum, error): # Flag unapproved C++11 headers. if include and include.group(1) in ('cfenv', - 'condition_variable', 'fenv.h', - 'future', - 'mutex', - 'thread', - 'chrono', 'ratio', - 'regex', 'system_error', ): error(filename, linenum, 'build/c++11', 5, http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/thirdparty/google-styleguide/run_linter.sh ---------------------------------------------------------------------- diff --git a/thirdparty/google-styleguide/run_linter.sh b/thirdparty/google-styleguide/run_linter.sh index e04d0aa..fbf8730 100755 --- a/thirdparty/google-styleguide/run_linter.sh +++ b/thirdparty/google-styleguide/run_linter.sh @@ -16,9 +16,14 @@ # # ./run_linter <includedir> <srcdir> #!/bin/bash +if [ "$(uname)" == "Darwin" ]; then SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +else +SCRIPT=$(readlink -f $0) +SCRIPT_DIR=`dirname $SCRIPT` +fi HEADERS=`find ${1} -name '*.h' | tr '\n' ','` SOURCES=`find ${2} -name '*.cpp' | tr '\n' ' '` echo ${HEADERS} echo ${SOURCES} -python ${SCRIPT_DIR}/cpplint.py --headers=${HEADERS} ${SOURCES} +python ${SCRIPT_DIR}/cpplint.py --linelength=128 --headers=${HEADERS} ${SOURCES}
