Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 3676468fc -> be3f2ffea


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenSyslog.cpp 
b/libminifi/src/processors/ListenSyslog.cpp
index 2dd223c..e7d2e7b 100644
--- a/libminifi/src/processors/ListenSyslog.cpp
+++ b/libminifi/src/processors/ListenSyslog.cpp
@@ -17,12 +17,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <queue>
+#include "processors/ListenSyslog.h"
 #include <stdio.h>
+#include <memory>
 #include <string>
+#include <vector>
+#include <set>
+#include <queue>
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
-#include "processors/ListenSyslog.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
@@ -32,7 +35,6 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-const std::string ListenSyslog::ProcessorName("ListenSyslog");
 core::Property ListenSyslog::RecvBufSize(
     "Receive Buffer Size",
     "The size of each buffer used to receive Syslog messages.", "65507 B");
@@ -48,20 +50,22 @@ core::Property ListenSyslog::MaxBatchSize(
     "The maximum number of Syslog events to add to a single FlowFile.", "1");
 core::Property ListenSyslog::MessageDelimiter(
     "Message Delimiter",
-    "Specifies the delimiter to place between Syslog messages when multiple 
messages are bundled together (see <Max Batch Size> core::Property).",
+    "Specifies the delimiter to place between Syslog messages when multiple "
+    "messages are bundled together (see <Max Batch Size> core::Property).",
     "\n");
 core::Property ListenSyslog::ParseMessages(
     "Parse Messages",
     "Indicates if the processor should parse the Syslog messages. If set to 
false, each outgoing FlowFile will only.",
     "false");
-core::Property ListenSyslog::Protocol(
-    "Protocol", "The protocol for Syslog communication.", "UDP");
-core::Property ListenSyslog::Port(
-    "Port", "The port for Syslog communication.", "514");
-core::Relationship ListenSyslog::Success(
-    "success", "All files are routed to success");
-core::Relationship ListenSyslog::Invalid(
-    "invalid", "SysLog message format invalid");
+core::Property ListenSyslog::Protocol("Protocol",
+                                      "The protocol for Syslog communication.",
+                                      "UDP");
+core::Property ListenSyslog::Port("Port", "The port for Syslog communication.",
+                                  "514");
+core::Relationship ListenSyslog::Success("success",
+                                         "All files are routed to success");
+core::Relationship ListenSyslog::Invalid("invalid",
+                                         "SysLog message format invalid");
 
 void ListenSyslog::initialize() {
   // Set the supported properties
@@ -125,7 +129,7 @@ void ListenSyslog::runThread() {
         logger_->log_info("ListenSysLog Server socket creation failed");
         break;
       }
-      bzero((char *) &serv_addr, sizeof(serv_addr));
+      bzero(reinterpret_cast<char *>(&serv_addr), sizeof(serv_addr));
       serv_addr.sin_family = AF_INET;
       serv_addr.sin_addr.s_addr = INADDR_ANY;
       serv_addr.sin_port = htons(portno);
@@ -167,7 +171,8 @@ void ListenSyslog::runThread() {
         socklen_t clilen;
         struct sockaddr_in cli_addr;
         clilen = sizeof(cli_addr);
-        int newsockfd = accept(_serverSocket, (struct sockaddr *) &cli_addr,
+        int newsockfd = accept(_serverSocket,
+                               reinterpret_cast<struct sockaddr *>(&cli_addr),
                                &clilen);
         if (newsockfd > 0) {
           if (_clientSockets.size() < _maxConnections) {
@@ -197,7 +202,7 @@ void ListenSyslog::runThread() {
     while (it != _clientSockets.end()) {
       int clientSocket = *it;
       if (FD_ISSET(clientSocket, &fds)) {
-        int recvlen = readline(clientSocket, (char *) _buffer, 
sizeof(_buffer));
+        int recvlen = readline(clientSocket, _buffer, sizeof(_buffer));
         if (recvlen <= 0) {
           close(clientSocket);
           logger_->log_info("ListenSysLog client socket %d close",
@@ -228,7 +233,7 @@ int ListenSyslog::readline(int fd, char *bufptr, size_t 
len) {
     if (--cnt <= 0) {
       cnt = recv(fd, b, sizeof(b), 0);
       if (cnt < 0) {
-        if ( errno == EINTR) {
+        if (errno == EINTR) {
           len++; /* the while will decrement */
           continue;
         }
@@ -248,9 +253,8 @@ int ListenSyslog::readline(int fd, char *bufptr, size_t 
len) {
   return -1;
 }
 
-void ListenSyslog::onTrigger(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
+void ListenSyslog::onTrigger(core::ProcessContext *context,
+                             core::ProcessSession *session) {
   std::string value;
   bool needResetServerSocket = false;
   if (context->getProperty(Protocol.getName(), value)) {
@@ -262,12 +266,10 @@ void ListenSyslog::onTrigger(
     core::Property::StringToInt(value, _recvBufSize);
   }
   if (context->getProperty(MaxSocketBufSize.getName(), value)) {
-    core::Property::StringToInt(value,
-                                                           _maxSocketBufSize);
+    core::Property::StringToInt(value, _maxSocketBufSize);
   }
   if (context->getProperty(MaxConnections.getName(), value)) {
-    core::Property::StringToInt(value,
-                                                           _maxConnections);
+    core::Property::StringToInt(value, _maxConnections);
   }
   if (context->getProperty(MessageDelimiter.getName(), value)) {
     _messageDelimiter = value;
@@ -283,8 +285,7 @@ void ListenSyslog::onTrigger(
       needResetServerSocket = true;
   }
   if (context->getProperty(MaxBatchSize.getName(), value)) {
-    core::Property::StringToInt(value,
-                                                           _maxBatchSize);
+    core::Property::StringToInt(value, _maxBatchSize);
   }
 
   if (needResetServerSocket)
@@ -309,12 +310,12 @@ void ListenSyslog::onTrigger(
       flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
       if (!flowFile)
         return;
-      ListenSyslog::WriteCallback callback((char *) event.payload, event.len);
+      ListenSyslog::WriteCallback callback(event.payload, event.len);
       session->write(flowFile, &callback);
       delete[] event.payload;
       firstEvent = false;
     } else {
-      ListenSyslog::WriteCallback callback((char *) event.payload, event.len);
+      ListenSyslog::WriteCallback callback(event.payload, event.len);
       session->append(flowFile, &callback);
       delete[] event.payload;
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/LogAttribute.cpp 
b/libminifi/src/processors/LogAttribute.cpp
index e2cf16c..d2dcd10 100644
--- a/libminifi/src/processors/LogAttribute.cpp
+++ b/libminifi/src/processors/LogAttribute.cpp
@@ -17,19 +17,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "processors/LogAttribute.h"
+#include <sys/time.h>
+#include <time.h>
+#include <string.h>
+#include <memory>
+#include <string>
 #include <vector>
 #include <queue>
 #include <map>
 #include <set>
-#include <sys/time.h>
-#include <time.h>
 #include <sstream>
-#include <string.h>
 #include <iostream>
-
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
-#include "processors/LogAttribute.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
@@ -38,7 +39,6 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
-const std::string LogAttribute::ProcessorName("LogAttribute");
 core::Property LogAttribute::LogLevel(
     "Log Level", "The Log Level to use when logging the Attributes", "info");
 core::Property LogAttribute::AttributesToLog(
@@ -51,7 +51,8 @@ core::Property LogAttribute::AttributesToIgnore(
     "");
 core::Property LogAttribute::LogPayload(
     "Log Payload",
-    "If true, the FlowFile's payload will be logged, in addition to its 
attributes; otherwise, just the Attributes will be logged.",
+    "If true, the FlowFile's payload will be logged, in addition to its 
attributes;"
+    "otherwise, just the Attributes will be logged.",
     "false");
 core::Property LogAttribute::LogPrefix(
     "Log prefix",
@@ -75,16 +76,14 @@ void LogAttribute::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void LogAttribute::onTrigger(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
+void LogAttribute::onTrigger(core::ProcessContext *context,
+                             core::ProcessSession *session) {
   std::string dashLine = "--------------------------------------------------";
   LogAttrLevel level = LogAttrLevelInfo;
   bool logPayload = false;
   std::ostringstream message;
 
-  std::shared_ptr<core::FlowFile> flow =
-      session->get();
+  std::shared_ptr<core::FlowFile> flow = session->get();
 
   if (!flow)
     return;
@@ -97,7 +96,8 @@ void LogAttribute::onTrigger(
     dashLine = "-----" + value + "-----";
   }
   if (context->getProperty(LogPayload.getName(), value)) {
-    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
logPayload);
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
+                                                                logPayload);
   }
 
   message << "Logging for flow file " << "\n";
@@ -125,9 +125,7 @@ void LogAttribute::onTrigger(
     ReadCallback callback(flow->getSize());
     session->read(flow, &callback);
     for (unsigned int i = 0, j = 0; i < callback._readSize; i++) {
-      char temp[8];
-      sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
-      message << temp;
+      message << std::hex << callback._buffer[i];
       j++;
       if (j == 16) {
         message << '\n';
@@ -168,7 +166,6 @@ void LogAttribute::onTrigger(
   session->transfer(flow, Success);
 }
 
-
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp 
b/libminifi/src/processors/PutFile.cpp
index 51fbb6f..bd08877 100644
--- a/libminifi/src/processors/PutFile.cpp
+++ b/libminifi/src/processors/PutFile.cpp
@@ -17,17 +17,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <sstream>
+#include "processors/PutFile.h"
 #include <stdio.h>
+#include <uuid/uuid.h>
+#include <sstream>
 #include <string>
 #include <iostream>
+#include <memory>
+#include <set>
 #include <fstream>
-#include <uuid/uuid.h>
-
 #include "io/validation.h"
 #include "utils/StringUtils.h"
 #include "utils/TimeUtil.h"
-#include "processors/PutFile.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
@@ -37,12 +38,6 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE("replace");
-const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_IGNORE("ignore");
-const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail");
-
-const std::string PutFile::ProcessorName("PutFile");
-
 core::Property PutFile::Directory("Output Directory",
                                   "The output directory to which to put files",
                                   ".");
@@ -72,7 +67,6 @@ void PutFile::initialize() {
 
 void PutFile::onSchedule(core::ProcessContext *context,
                          core::ProcessSessionFactory *sessionFactory) {
-
   if (!context->getProperty(Directory.getName(), directory_)) {
     logger_->log_error("Directory attribute is missing or invalid");
   }
@@ -82,11 +76,10 @@ void PutFile::onSchedule(core::ProcessContext *context,
     logger_->log_error(
         "Conflict Resolution Strategy attribute is missing or invalid");
   }
-
 }
+
 void PutFile::onTrigger(core::ProcessContext *context,
                         core::ProcessSession *session) {
-
   if (IsNullOrEmpty(directory_) || IsNullOrEmpty(conflict_resolution_)) {
     context->yield();
     return;
@@ -144,7 +137,6 @@ void PutFile::onTrigger(core::ProcessContext *context,
 bool PutFile::putFile(core::ProcessSession *session,
                       std::shared_ptr<FlowFileRecord> flowFile,
                       const std::string &tmpFile, const std::string &destFile) 
{
-
   ReadCallback cb(tmpFile, destFile);
   session->read(flowFile, &cb);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/RealTimeDataCollector.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/RealTimeDataCollector.cpp 
b/libminifi/src/processors/RealTimeDataCollector.cpp
deleted file mode 100644
index 922835d..0000000
--- a/libminifi/src/processors/RealTimeDataCollector.cpp
+++ /dev/null
@@ -1,480 +0,0 @@
-/**
- * @file RealTimeDataCollector.cpp
- * RealTimeDataCollector class implementation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
-#include <sys/time.h>
-#include <time.h>
-#include <chrono>
-#include <thread>
-#include <memory>
-#include <random>
-#include <netinet/tcp.h>
-
-#include "utils/StringUtils.h"
-#include "processors/RealTimeDataCollector.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
-const std::string 
RealTimeDataCollector::ProcessorName("RealTimeDataCollector");
-core::Property RealTimeDataCollector::FILENAME(
-    "File Name", "File Name for the real time processor to process",
-    "data.osp");
-core::Property RealTimeDataCollector::REALTIMESERVERNAME(
-    "Real Time Server Name", "Real Time Server Name", "localhost");
-core::Property RealTimeDataCollector::REALTIMESERVERPORT(
-    "Real Time Server Port", "Real Time Server Port", "10000");
-core::Property RealTimeDataCollector::BATCHSERVERNAME(
-    "Batch Server Name", "Batch Server Name", "localhost");
-core::Property RealTimeDataCollector::BATCHSERVERPORT(
-    "Batch Server Port", "Batch Server Port", "10001");
-core::Property RealTimeDataCollector::ITERATION(
-    "Iteration", "If true, sample osp file will be iterated", "true");
-core::Property RealTimeDataCollector::REALTIMEMSGID(
-    "Real Time Message ID", "Real Time Message ID", "41");
-core::Property RealTimeDataCollector::BATCHMSGID(
-    "Batch Message ID", "Batch Message ID", "172, 30, 48");
-core::Property RealTimeDataCollector::REALTIMEINTERVAL(
-    "Real Time Interval", "Real Time Data Collection Interval in msec",
-    "10 ms");
-core::Property RealTimeDataCollector::BATCHINTERVAL(
-    "Batch Time Interval", "Batch Processing Interval in msec", "100 ms");
-core::Property RealTimeDataCollector::BATCHMAXBUFFERSIZE(
-    "Batch Max Buffer Size", "Batch Buffer Maximum size in bytes", "262144");
-core::Relationship RealTimeDataCollector::Success(
-    "success", "success operational on the flow record");
-
-void RealTimeDataCollector::initialize() {
-  // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(FILENAME);
-  properties.insert(REALTIMESERVERNAME);
-  properties.insert(REALTIMESERVERPORT);
-  properties.insert(BATCHSERVERNAME);
-  properties.insert(BATCHSERVERPORT);
-  properties.insert(ITERATION);
-  properties.insert(REALTIMEMSGID);
-  properties.insert(BATCHMSGID);
-  properties.insert(REALTIMEINTERVAL);
-  properties.insert(BATCHINTERVAL);
-  properties.insert(BATCHMAXBUFFERSIZE);
-
-  setSupportedProperties(properties);
-  // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Success);
-  setSupportedRelationships(relationships);
-
-}
-
-int RealTimeDataCollector::connectServer(const char *host, uint16_t port) {
-  in_addr_t addr;
-  int sock = 0;
-  struct hostent *h;
-#ifdef __MACH__
-  h = gethostbyname(host);
-#else
-  char buf[1024];
-  struct hostent he;
-  int hh_errno;
-  gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
-#endif
-  memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
-  sock = socket(AF_INET, SOCK_STREAM, 0);
-  if (sock < 0) {
-    logger_->log_error("Could not create socket to hostName %s", host);
-    return 0;
-  }
-
-#ifndef __MACH__
-  int opt = 1;
-  bool nagle_off = true;
-
-  if (nagle_off)
-  {
-    if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0)
-    {
-      logger_->log_error("setsockopt() TCP_NODELAY failed");
-      close(sock);
-      return 0;
-    }
-    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-            (char *)&opt, sizeof(opt)) < 0)
-    {
-      logger_->log_error("setsockopt() SO_REUSEADDR failed");
-      close(sock);
-      return 0;
-    }
-  }
-
-  int sndsize = 256*1024;
-  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
-  {
-    logger_->log_error("setsockopt() SO_SNDBUF failed");
-    close(sock);
-    return 0;
-  }
-#endif
-
-  struct sockaddr_in sa;
-  socklen_t socklen;
-  int status;
-
-  //TODO bind socket to the interface
-  memset(&sa, 0, sizeof(sa));
-  sa.sin_family = AF_INET;
-  sa.sin_addr.s_addr = htonl(INADDR_ANY);
-  sa.sin_port = htons(0);
-  socklen = sizeof(sa);
-  if (bind(sock, (struct sockaddr *) &sa, socklen) < 0) {
-    logger_->log_error("socket bind failed");
-    close(sock);
-    return 0;
-  }
-
-  memset(&sa, 0, sizeof(sa));
-  sa.sin_family = AF_INET;
-  sa.sin_addr.s_addr = addr;
-  sa.sin_port = htons(port);
-  socklen = sizeof(sa);
-
-  status = connect(sock, (struct sockaddr *) &sa, socklen);
-
-  if (status < 0) {
-    logger_->log_error("socket connect failed to %s %d", host, port);
-    close(sock);
-    return 0;
-  }
-
-  logger_->log_info("socket %d connect to server %s port %d success", sock,
-                    host, port);
-
-  return sock;
-}
-
-int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen) {
-  int ret = 0, bytes = 0;
-
-  while (bytes < buflen) {
-    ret = send(socket, buf + bytes, buflen - bytes, 0);
-    //check for errors
-    if (ret == -1) {
-      return ret;
-    }
-    bytes += ret;
-  }
-
-  if (ret)
-    logger_->log_debug("Send data size %d over socket %d", buflen, socket);
-
-  return ret;
-}
-
-void RealTimeDataCollector::onTriggerRealTime(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
-  if (_realTimeAccumulated >= this->_realTimeInterval) {
-    std::string value;
-    if (this->getProperty(REALTIMEMSGID.getName(), value)) {
-      this->_realTimeMsgID.clear();
-      this->logger_->log_info("Real Time Msg IDs %s", value.c_str());
-      std::stringstream lineStream(value);
-      std::string cell;
-
-      while (std::getline(lineStream, cell, ',')) {
-        this->_realTimeMsgID.push_back(cell);
-        // this->logger_->log_debug("Real Time Msg ID %s", cell.c_str());
-      }
-    }
-    if (this->getProperty(BATCHMSGID.getName(), value)) {
-      this->_batchMsgID.clear();
-      this->logger_->log_info("Batch Msg IDs %s", value.c_str());
-      std::stringstream lineStream(value);
-      std::string cell;
-
-      while (std::getline(lineStream, cell, ',')) {
-        cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell);
-        this->_batchMsgID.push_back(cell);
-      }
-    }
-    // Open the file
-    if (!this->_fileStream.is_open()) {
-      _fileStream.open(this->_fileName.c_str(), std::ifstream::in);
-      if (this->_fileStream.is_open())
-        logger_->log_debug("open %s", _fileName.c_str());
-    }
-    if (!_fileStream.good()) {
-      logger_->log_error("load data file failed %s", _fileName.c_str());
-      return;
-    }
-    if (this->_fileStream.is_open()) {
-      std::string line;
-
-      while (std::getline(_fileStream, line)) {
-        line += "\n";
-        std::stringstream lineStream(line);
-        std::string cell;
-        if (std::getline(lineStream, cell, ',')) {
-          cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell);
-          // Check whether it match to the batch traffic
-          for (std::vector<std::string>::iterator it = _batchMsgID.begin();
-              it != _batchMsgID.end(); ++it) {
-            if (cell == *it) {
-              // push the batch data to the queue
-              std::lock_guard<std::mutex> lock(mutex_);
-              while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) {
-                std::string item = _queue.front();
-                _queuedDataSize -= item.size();
-                logger_->log_debug(
-                    "Pop item size %d from batch queue, queue buffer size %d",
-                    item.size(), _queuedDataSize);
-                _queue.pop();
-              }
-              _queue.push(line);
-              _queuedDataSize += line.size();
-              logger_->log_debug(
-                  "Push batch msg ID %s into batch queue, queue buffer size 
%d",
-                  cell.c_str(), _queuedDataSize);
-            }
-          }
-          bool findRealTime = false;
-          // Check whether it match to the real time traffic
-          for (std::vector<std::string>::iterator it = _realTimeMsgID.begin();
-              it != _realTimeMsgID.end(); ++it) {
-            if (cell == *it) {
-              int status = 0;
-              if (this->_realTimeSocket <= 0) {
-                // Connect the LTE socket
-                uint16_t port = _realTimeServerPort;
-                this->_realTimeSocket = connectServer(
-                    _realTimeServerName.c_str(), port);
-              }
-              if (this->_realTimeSocket) {
-                // try to send the data
-                status = sendData(_realTimeSocket, line.data(), line.size());
-                if (status < 0) {
-                  close(_realTimeSocket);
-                  _realTimeSocket = 0;
-                }
-              }
-              if (this->_realTimeSocket <= 0 || status < 0) {
-                // push the batch data to the queue
-                std::lock_guard<std::mutex> lock(mutex_);
-                while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) {
-                  std::string item = _queue.front();
-                  _queuedDataSize -= item.size();
-                  logger_->log_debug(
-                      "Pop item size %d from batch queue, queue buffer size 
%d",
-                      item.size(), _queuedDataSize);
-                  _queue.pop();
-                }
-                _queue.push(line);
-                _queuedDataSize += line.size();
-                logger_->log_debug(
-                    "Push real time msg ID %s into batch queue, queue buffer 
size %d",
-                    cell.c_str(), _queuedDataSize);
-              }
-              // find real time
-              findRealTime = true;
-            }  // cell
-          }  // for real time pattern
-          if (findRealTime)
-            // we break the while once we find the first real time
-            break;
-        }  // if get line
-      }  // while
-      if (_fileStream.eof()) {
-        _fileStream.close();
-      }
-    }  // if open
-    _realTimeAccumulated = 0;
-  }
-  std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
-      context->getProcessorNode().getProcessor());
-  _realTimeAccumulated += processor->getSchedulingPeriodNano();
-}
-
-void RealTimeDataCollector::onTriggerBatch(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
-  if (_batchAcccumulated >= this->_batchInterval) {
-    // logger_->log_info("onTriggerBatch");
-    // dequeue the batch and send over WIFI
-    int status = 0;
-    if (this->_batchSocket <= 0) {
-      // Connect the WIFI socket
-      uint16_t port = _batchServerPort;
-      this->_batchSocket = connectServer(_batchServerName.c_str(), port);
-    }
-    if (this->_batchSocket) {
-      std::lock_guard<std::mutex> lock(mutex_);
-
-      while (!_queue.empty()) {
-        std::string line = _queue.front();
-        status = sendData(_batchSocket, line.data(), line.size());
-        _queue.pop();
-        _queuedDataSize -= line.size();
-        if (status < 0) {
-          close(_batchSocket);
-          _batchSocket = 0;
-          break;
-        }
-      }
-    }
-    _batchAcccumulated = 0;
-  }
-  std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
-      context->getProcessorNode().getProcessor());
-  _batchAcccumulated += processor->getSchedulingPeriodNano();
-}
-
-void RealTimeDataCollector::onTrigger(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
-  std::thread::id id = std::this_thread::get_id();
-
-  if (id == _realTimeThreadId)
-    return onTriggerRealTime(context, session);
-  else if (id == _batchThreadId)
-    return onTriggerBatch(context, session);
-  else {
-    std::lock_guard<std::mutex> lock(mutex_);
-    if (!this->_firstInvoking) {
-      this->_fileName = "data.osp";
-      std::string value;
-      if (this->getProperty(FILENAME.getName(), value)) {
-        this->_fileName = value;
-        this->logger_->log_info("Data Collector File Name %s",
-                                _fileName.c_str());
-      }
-      this->_realTimeServerName = "localhost";
-      if (this->getProperty(REALTIMESERVERNAME.getName(), value)) {
-        this->_realTimeServerName = value;
-        this->logger_->log_info("Real Time Server Name %s",
-                                this->_realTimeServerName.c_str());
-      }
-      this->_realTimeServerPort = 10000;
-      if (this->getProperty(REALTIMESERVERPORT.getName(), value)) {
-        core::Property::StringToInt(
-            value, _realTimeServerPort);
-        this->logger_->log_info("Real Time Server Port %d",
-                                _realTimeServerPort);
-      }
-      if (this->getProperty(BATCHSERVERNAME.getName(), value)) {
-        this->_batchServerName = value;
-        this->logger_->log_info("Batch Server Name %s",
-                                this->_batchServerName.c_str());
-      }
-      this->_batchServerPort = 10001;
-      if (this->getProperty(BATCHSERVERPORT.getName(), value)) {
-        core::Property::StringToInt(
-            value, _batchServerPort);
-        this->logger_->log_info("Batch Server Port %d", _batchServerPort);
-      }
-      if (this->getProperty(ITERATION.getName(), value)) {
-        org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
this->_iteration);
-        logger_->log_info("Iteration %d", _iteration);
-      }
-      this->_realTimeInterval = 10000000;  //10 msec
-      if (this->getProperty(REALTIMEINTERVAL.getName(), value)) {
-        core::TimeUnit unit;
-        if (core::Property::StringToTime(
-            value, _realTimeInterval, unit)
-            && core::Property::ConvertTimeUnitToNS(
-                _realTimeInterval, unit, _realTimeInterval)) {
-          logger_->log_info("Real Time Interval: [%d] ns", _realTimeInterval);
-        }
-      }
-      this->_batchInterval = 100000000;  //100 msec
-      if (this->getProperty(BATCHINTERVAL.getName(), value)) {
-        core::TimeUnit unit;
-        if (core::Property::StringToTime(
-            value, _batchInterval, unit)
-            && core::Property::ConvertTimeUnitToNS(
-                _batchInterval, unit, _batchInterval)) {
-          logger_->log_info("Batch Time Interval: [%d] ns", _batchInterval);
-        }
-      }
-      this->_batchMaxBufferSize = 256 * 1024;
-      if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), value)) {
-        core::Property::StringToInt(
-            value, _batchMaxBufferSize);
-        this->logger_->log_info("Batch Max Buffer Size %d",
-                                _batchMaxBufferSize);
-      }
-      if (this->getProperty(REALTIMEMSGID.getName(), value)) {
-        this->logger_->log_info("Real Time Msg IDs %s", value.c_str());
-        std::stringstream lineStream(value);
-        std::string cell;
-
-        while (std::getline(lineStream, cell, ',')) {
-          this->_realTimeMsgID.push_back(cell);
-          this->logger_->log_info("Real Time Msg ID %s", cell.c_str());
-        }
-      }
-      if (this->getProperty(BATCHMSGID.getName(), value)) {
-        this->logger_->log_info("Batch Msg IDs %s", value.c_str());
-        std::stringstream lineStream(value);
-        std::string cell;
-
-        while (std::getline(lineStream, cell, ',')) {
-          cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell);
-          this->_batchMsgID.push_back(cell);
-          this->logger_->log_info("Batch Msg ID %s", cell.c_str());
-        }
-      }
-      // Connect the LTE socket
-      uint16_t port = _realTimeServerPort;
-
-      this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
-
-      // Connect the WIFI socket
-      port = _batchServerPort;
-
-      this->_batchSocket = connectServer(_batchServerName.c_str(), port);
-
-      // Open the file
-      _fileStream.open(this->_fileName.c_str(), std::ifstream::in);
-      if (!_fileStream.good()) {
-        logger_->log_error("load data file failed %s", _fileName.c_str());
-        return;
-      } else {
-        logger_->log_debug("open %s", _fileName.c_str());
-      }
-      _realTimeThreadId = id;
-      this->_firstInvoking = true;
-    } else {
-      if (id != _realTimeThreadId)
-        _batchThreadId = id;
-      this->_firstInvoking = false;
-    }
-  }
-}
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/processors/TailFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/TailFile.cpp 
b/libminifi/src/processors/TailFile.cpp
index bcdd8fd..abb02ca 100644
--- a/libminifi/src/processors/TailFile.cpp
+++ b/libminifi/src/processors/TailFile.cpp
@@ -17,22 +17,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <vector>
-#include <queue>
-#include <map>
-#include <set>
 #include <sys/time.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <time.h>
-#include <sstream>
 #include <stdio.h>
-#include <string>
-#include <iostream>
 #include <dirent.h>
 #include <limits.h>
 #include <unistd.h>
-
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <memory>
+#include <algorithm>
+#include <sstream>
+#include <string>
+#include <iostream>
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "processors/TailFile.h"
@@ -45,16 +46,16 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-const std::string TailFile::ProcessorName("TailFile");
 core::Property TailFile::FileName(
     "File to Tail",
     "Fully-qualified filename of the file that should be tailed", "");
 core::Property TailFile::StateFile(
     "State File",
-    "Specifies the file that should be used for storing state about what data 
has been ingested so that upon restart NiFi can resume from where it left off",
+    "Specifies the file that should be used for storing state about"
+    " what data has been ingested so that upon restart NiFi can resume from 
where it left off",
     "TailFileState");
-core::Relationship TailFile::Success(
-    "success", "All files are routed to success");
+core::Relationship TailFile::Success("success",
+                                     "All files are routed to success");
 
 void TailFile::initialize() {
   // Set the supported properties
@@ -123,9 +124,9 @@ void TailFile::recoverState() {
     logger_->log_error("load state file failed %s", _stateFile.c_str());
     return;
   }
-  const unsigned int bufSize = 512;
-  char buf[bufSize];
-  for (file.getline(buf, bufSize); file.good(); file.getline(buf, bufSize)) {
+  char buf[BUFFER_SIZE];
+  for (file.getline(buf, BUFFER_SIZE); file.good();
+      file.getline(buf, BUFFER_SIZE)) {
     parseStateFileLine(buf);
   }
 }
@@ -145,7 +146,8 @@ static bool sortTailMatchedFileItem(TailMatchedFileItem i,
                                     TailMatchedFileItem j) {
   return (i.modifiedTime < j.modifiedTime);
 }
-void TailFile::checkRollOver(const std::string &fileLocation, const 
std::string &fileName) {
+void TailFile::checkRollOver(const std::string &fileLocation,
+                             const std::string &fileName) {
   struct stat statbuf;
   std::vector<TailMatchedFileItem> matchedFiles;
   std::string fullPath = fileLocation + "/" + _currentTailFileName;
@@ -208,18 +210,17 @@ void TailFile::checkRollOver(const std::string 
&fileLocation, const std::string
         break;
       }
     }
-  } else
+  } else {
     return;
+  }
 }
 
-void TailFile::onTrigger(
-    core::ProcessContext *context,
-    core::ProcessSession *session) {
-
+void TailFile::onTrigger(core::ProcessContext *context,
+                         core::ProcessSession *session) {
   std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
   std::string value;
-  std::string fileLocation="";
-  std::string fileName="";
+  std::string fileLocation = "";
+  std::string fileName = "";
   if (context->getProperty(FileName.getName(), value)) {
     std::size_t found = value.find_last_of("/\\");
     fileLocation = value.substr(0, found);
@@ -235,17 +236,17 @@ void TailFile::onTrigger(
     // recover the state if we have not done so
     this->recoverState();
   }
-  checkRollOver(fileLocation,fileName);
+  checkRollOver(fileLocation, fileName);
   std::string fullPath = fileLocation + "/" + _currentTailFileName;
   struct stat statbuf;
   if (stat(fullPath.c_str(), &statbuf) == 0) {
-    if (statbuf.st_size <= this->_currentTailFilePosition)
-    // there are no new input for the current tail file
-        {
+    if (statbuf.st_size <= this->_currentTailFilePosition) {
+      // there are no new input for the current tail fil
       context->yield();
       return;
     }
-    std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
+    std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
+        FlowFileRecord>(session->create());
     if (!flowFile)
       return;
     std::size_t found = _currentTailFileName.find_last_of(".");
@@ -267,7 +268,6 @@ void TailFile::onTrigger(
   }
 }
 
-
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/provenance/Provenance.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/Provenance.cpp 
b/libminifi/src/provenance/Provenance.cpp
index 289f026..b1db9a8 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+#include <arpa/inet.h>
 #include <cstdint>
+#include <memory>
+#include <string>
 #include <vector>
-#include <arpa/inet.h>
 #include "io/DataStream.h"
 #include "io/Serializable.h"
 #include "provenance/Provenance.h"
-
 #include "core/logging/Logger.h"
 #include "core/Relationship.h"
 #include "FlowController.h"
@@ -44,9 +46,10 @@ bool ProvenanceEventRecord::DeSerialize(
     logger_->log_error("NiFi Provenance Store event %s can not found",
                        key.c_str());
     return false;
-  } else
+  } else {
     logger_->log_debug("NiFi Provenance Read event %s length %d", key.c_str(),
                        value.length());
+  }
 
   org::apache::nifi::minifi::io::DataStream stream(
       (const uint8_t*) value.data(), value.length());
@@ -75,20 +78,17 @@ bool ProvenanceEventRecord::Serialize(
 
   ret = writeUTF(this->_eventIdStr, &outStream);
   if (ret <= 0) {
-
     return false;
   }
 
   uint32_t eventType = this->_eventType;
   ret = write(eventType, &outStream);
   if (ret != 4) {
-
     return false;
   }
 
   ret = write(this->_eventTime, &outStream);
   if (ret != 8) {
-
     return false;
   }
 
@@ -99,37 +99,31 @@ bool ProvenanceEventRecord::Serialize(
 
   ret = write(this->_eventDuration, &outStream);
   if (ret != 8) {
-
     return false;
   }
 
   ret = write(this->_lineageStartDate, &outStream);
   if (ret != 8) {
-
     return false;
   }
 
   ret = writeUTF(this->_componentId, &outStream);
   if (ret <= 0) {
-
     return false;
   }
 
   ret = writeUTF(this->_componentType, &outStream);
   if (ret <= 0) {
-
     return false;
   }
 
   ret = writeUTF(this->uuid_, &outStream);
   if (ret <= 0) {
-
     return false;
   }
 
   ret = writeUTF(this->_details, &outStream);
   if (ret <= 0) {
-
     return false;
   }
 
@@ -137,44 +131,37 @@ bool ProvenanceEventRecord::Serialize(
   uint32_t numAttributes = this->_attributes.size();
   ret = write(numAttributes, &outStream);
   if (ret != 4) {
-
     return false;
   }
 
   for (auto itAttribute : _attributes) {
     ret = writeUTF(itAttribute.first, &outStream, true);
     if (ret <= 0) {
-
       return false;
     }
     ret = writeUTF(itAttribute.second, &outStream, true);
     if (ret <= 0) {
-
       return false;
     }
   }
 
   ret = writeUTF(this->_contentFullPath, &outStream);
   if (ret <= 0) {
-
     return false;
   }
 
   ret = write(this->_size, &outStream);
   if (ret != 8) {
-
     return false;
   }
 
   ret = write(this->_offset, &outStream);
   if (ret != 8) {
-
     return false;
   }
 
   ret = writeUTF(this->_sourceQueueIdentifier, &outStream);
   if (ret <= 0) {
-
     return false;
   }
 
@@ -185,13 +172,11 @@ bool ProvenanceEventRecord::Serialize(
     uint32_t number = this->_parentUuids.size();
     ret = write(number, &outStream);
     if (ret != 4) {
-
       return false;
     }
     for (auto parentUUID : _parentUuids) {
       ret = writeUTF(parentUUID, &outStream);
       if (ret <= 0) {
-
         return false;
       }
     }
@@ -203,7 +188,6 @@ bool ProvenanceEventRecord::Serialize(
     for (auto childUUID : _childrenUuids) {
       ret = writeUTF(childUUID, &outStream);
       if (ret <= 0) {
-
         return false;
       }
     }
@@ -211,24 +195,19 @@ bool ProvenanceEventRecord::Serialize(
       || this->_eventType == ProvenanceEventRecord::FETCH) {
     ret = writeUTF(this->_transitUri, &outStream);
     if (ret <= 0) {
-
       return false;
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
     ret = writeUTF(this->_transitUri, &outStream);
     if (ret <= 0) {
-
       return false;
     }
     ret = writeUTF(this->_sourceSystemFlowFileIdentifier, &outStream);
     if (ret <= 0) {
-
       return false;
     }
   }
-
   // Persistent to the DB
-
   if (repo->Put(_eventIdStr, const_cast<uint8_t*>(outStream.getBuffer()),
                 outStream.getSize())) {
     logger_->log_debug("NiFi Provenance Store event %s size %d success",
@@ -237,16 +216,11 @@ bool ProvenanceEventRecord::Serialize(
     logger_->log_error("NiFi Provenance Store event %s size %d fail",
                        _eventIdStr.c_str(), outStream.getSize());
   }
-
-
-  // cleanup
-
   return true;
 }
 
 bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer,
                                         const int bufferSize) {
-
   int ret;
 
   org::apache::nifi::minifi::io::DataStream outStream(buffer, bufferSize);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/src/provenance/ProvenanceRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/provenance/ProvenanceRepository.cpp 
b/libminifi/src/provenance/ProvenanceRepository.cpp
index 6fe332b..77de5ba 100644
--- a/libminifi/src/provenance/ProvenanceRepository.cpp
+++ b/libminifi/src/provenance/ProvenanceRepository.cpp
@@ -16,22 +16,21 @@
  * limitations under the License.
  */
 
-#include "provenance/Provenance.h"
 #include "provenance/ProvenanceRepository.h"
-
+#include <string>
+#include <vector>
+#include "provenance/Provenance.h"
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
-namespace provenance{
-
-
+namespace provenance {
 
 void ProvenanceRepository::run() {
   // threshold for purge
   uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4;
   while (running_) {
-        std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
+    std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
     std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_));
     uint64_t curTime = getTimeMillis();
     uint64_t size = repoSize();
@@ -41,24 +40,24 @@ void ProvenanceRepository::run() {
       for (it->SeekToFirst(); it->Valid(); it->Next()) {
         ProvenanceEventRecord eventRead;
         std::string key = it->key().ToString();
-        if (eventRead.DeSerialize((uint8_t *) it->value().data(),
-                                  (int) it->value().size())) {
-          if ((curTime - eventRead.getEventTime())
-              > max_partition_millis_)
+        if (eventRead.DeSerialize(
+            reinterpret_cast<uint8_t*>(const_cast<char*>(it->value().data())),
+            it->value().size())) {
+          if ((curTime - eventRead.getEventTime()) > max_partition_millis_)
             purgeList.push_back(key);
         } else {
           logger_->log_debug("NiFi Provenance retrieve event %s fail",
-                                   key.c_str());
+                             key.c_str());
           purgeList.push_back(key);
         }
       }
       delete it;
       std::vector<std::string>::iterator itPurge;
-      
+
       for (itPurge = purgeList.begin(); itPurge != purgeList.end(); itPurge++) 
{
         std::string eventId = *itPurge;
         logger_->log_info("ProvenanceRepository Repo Purge %s",
-                                eventId.c_str());
+                          eventId.c_str());
         Delete(eventId);
       }
     }
@@ -66,7 +65,6 @@ void ProvenanceRepository::run() {
       repo_full_ = true;
     else
       repo_full_ = false;
-    
   }
   return;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 4f926d3..585c8cd 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -26,7 +26,7 @@
 #include <vector>
 #include "core/logging/LogAppenders.h"
 #include "core/logging/Logger.h"
-#include "core/core.h"
+#include "core/Core.h"
 
 class LogTestController {
  public:
@@ -48,7 +48,7 @@ class TestController {
 
   TestController()
       : log("info") {
-    minifi::ResourceClaim::default_directory_path = "./";
+    minifi::ResourceClaim::default_directory_path = const_cast<char*>("./");
   }
 
   ~TestController() {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/nodefs/NoLevelDB.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/nodefs/NoLevelDB.cpp 
b/libminifi/test/nodefs/NoLevelDB.cpp
index 00c9212..09b4916 100644
--- a/libminifi/test/nodefs/NoLevelDB.cpp
+++ b/libminifi/test/nodefs/NoLevelDB.cpp
@@ -18,7 +18,7 @@
 
 #include "../TestBase.h"
 
-#include "core/core.h"
+#include "core/Core.h"
 #include "core/RepositoryFactory.h"
 
 TEST_CASE("NoLevelDBTest1", "[NoLevelDBTest]") {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/nodefs/NoYamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/nodefs/NoYamlConfiguration.cpp 
b/libminifi/test/nodefs/NoYamlConfiguration.cpp
index 9a9b10e..5f3fce4 100644
--- a/libminifi/test/nodefs/NoYamlConfiguration.cpp
+++ b/libminifi/test/nodefs/NoYamlConfiguration.cpp
@@ -17,7 +17,7 @@
  */
 
 
-#include "core/core.h"
+#include "core/Core.h"
 #include "core/RepositoryFactory.h"
 
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/unit/ProcessorTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProcessorTests.cpp 
b/libminifi/test/unit/ProcessorTests.cpp
index 91a55f7..87f190c 100644
--- a/libminifi/test/unit/ProcessorTests.cpp
+++ b/libminifi/test/unit/ProcessorTests.cpp
@@ -23,7 +23,7 @@
 #include "core/logging/LogAppenders.h"
 #include "core/logging/BaseLogger.h"
 #include "processors/GetFile.h"
-#include "core/core.h"
+#include "core/Core.h"
 #include "../../include/core/FlowFile.h"
 #include "core/Processor.h"
 #include "core/ProcessContext.h"
@@ -189,6 +189,7 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", 
"[getfileCreate3]") {
   processor->setScheduledState(core::ScheduledState::RUNNING);
   processor->onSchedule(&context, &factory);
 
+  int prev = 0;
   for (int i = 0; i < 10; i++) {
 
     core::ProcessSession session(&context);
@@ -216,7 +217,6 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", 
"[getfileCreate3]") {
     processor->setScheduledState(core::ScheduledState::RUNNING);
     processor->onTrigger(&context, &session);
     unlink(ss.str().c_str());
-    rmdir(dir);
     reporter = session.getProvenanceReporter();
 
     REQUIRE(processor->getName() == "getfileCreate2");
@@ -229,14 +229,10 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", 
"[getfileCreate3]") {
     session.commit();
     std::shared_ptr<core::FlowFile> ffr = session.get();
 
-    REQUIRE(2 == repo->getRepoMap().size());
+    REQUIRE((repo->getRepoMap().size()%2) == 0);
+    REQUIRE(repo->getRepoMap().size() == (prev+2));
+    prev+=2;
 
-    for (auto entry : repo->getRepoMap()) {
-      provenance::ProvenanceEventRecord newRecord;
-      newRecord.DeSerialize((uint8_t*) entry.second.data(),
-                            entry.second.length());
-
-    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/unit/ProvenanceTestHelper.h
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h 
b/libminifi/test/unit/ProvenanceTestHelper.h
index 1e16aa6..80d8642 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -21,7 +21,7 @@
 #include "provenance/Provenance.h"
 #include "FlowController.h"
 #include "core/Repository.h"
-#include "core/core.h"
+#include "core/Core.h"
 /**
  * Test repository
  */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/unit/ProvenanceTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/ProvenanceTests.cpp 
b/libminifi/test/unit/ProvenanceTests.cpp
index c73cef2..2e41cc8 100644
--- a/libminifi/test/unit/ProvenanceTests.cpp
+++ b/libminifi/test/unit/ProvenanceTests.cpp
@@ -21,7 +21,7 @@
 #include "ProvenanceTestHelper.h"
 #include "provenance/Provenance.h"
 #include "FlowFileRecord.h"
-#include "core/core.h"
+#include "core/Core.h"
 #include "core/repository/FlowFileRepository.h"
 
 TEST_CASE("Test Provenance record create", 
"[Testprovenance::ProvenanceEventRecord]") {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/libminifi/test/unit/RepoTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/RepoTests.cpp 
b/libminifi/test/unit/RepoTests.cpp
index 9237e7e..21fae45 100644
--- a/libminifi/test/unit/RepoTests.cpp
+++ b/libminifi/test/unit/RepoTests.cpp
@@ -21,7 +21,7 @@
 #include "ProvenanceTestHelper.h"
 #include "provenance/Provenance.h"
 #include "FlowFileRecord.h"
-#include "core/core.h"
+#include "core/Core.h"
 #include "core/repository/FlowFileRepository.h"
 
 TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/main/MiNiFiMain.cpp
----------------------------------------------------------------------
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 6bfd9c9..9e6a37f 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -28,7 +28,7 @@
 #include <yaml-cpp/yaml.h>
 #include <iostream>
 
-#include "core/core.h"
+#include "core/Core.h"
 
 #include "core/logging/BaseLogger.h"
 #include "core/logging/LogAppenders.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/thirdparty/google-styleguide/cpplint.py
----------------------------------------------------------------------
diff --git a/thirdparty/google-styleguide/cpplint.py 
b/thirdparty/google-styleguide/cpplint.py
index fafc243..eda78bd 100644
--- a/thirdparty/google-styleguide/cpplint.py
+++ b/thirdparty/google-styleguide/cpplint.py
@@ -5766,14 +5766,8 @@ def FlagCxx11Features(filename, clean_lines, linenum, 
error):
 
   # Flag unapproved C++11 headers.
   if include and include.group(1) in ('cfenv',
-                                      'condition_variable',
                                       'fenv.h',
-                                      'future',
-                                      'mutex',
-                                      'thread',
-                                      'chrono',
                                       'ratio',
-                                      'regex',
                                       'system_error',
                                      ):
     error(filename, linenum, 'build/c++11', 5,

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/be3f2ffe/thirdparty/google-styleguide/run_linter.sh
----------------------------------------------------------------------
diff --git a/thirdparty/google-styleguide/run_linter.sh 
b/thirdparty/google-styleguide/run_linter.sh
index e04d0aa..fbf8730 100755
--- a/thirdparty/google-styleguide/run_linter.sh
+++ b/thirdparty/google-styleguide/run_linter.sh
@@ -16,9 +16,14 @@
 #
 # ./run_linter <includedir> <srcdir>
 #!/bin/bash
+if [ "$(uname)" == "Darwin" ]; then
 SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+else
+SCRIPT=$(readlink -f $0)
+SCRIPT_DIR=`dirname $SCRIPT`
+fi
 HEADERS=`find ${1} -name '*.h' | tr '\n' ','`
 SOURCES=`find ${2} -name  '*.cpp' | tr '\n' ' '`
 echo ${HEADERS}
 echo ${SOURCES}
-python ${SCRIPT_DIR}/cpplint.py --headers=${HEADERS} ${SOURCES}
+python ${SCRIPT_DIR}/cpplint.py --linelength=128 --headers=${HEADERS} 
${SOURCES}

Reply via email to