http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExecuteProcess.cpp 
b/libminifi/src/processors/ExecuteProcess.cpp
new file mode 100644
index 0000000..3cbbc1b
--- /dev/null
+++ b/libminifi/src/processors/ExecuteProcess.cpp
@@ -0,0 +1,255 @@
+/**
+ * @file ExecuteProcess.cpp
+ * ExecuteProcess class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "processors/ExecuteProcess.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include <cstring>
+#include "utils/StringUtils.h"
+#include "utils/TimeUtil.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string ExecuteProcess::ProcessorName("ExecuteProcess");
+core::Property ExecuteProcess::Command(
+    "Command",
+    "Specifies the command to be executed; if just the name of an executable 
is provided, it must be in the user's environment PATH.",
+    "");
+core::Property ExecuteProcess::CommandArguments(
+    "Command Arguments",
+    "The arguments to supply to the executable delimited by white space. White 
space can be escaped by enclosing it in double-quotes.",
+    "");
+core::Property ExecuteProcess::WorkingDir(
+    "Working Directory",
+    "The directory to use as the current working directory when executing the 
command",
+    "");
+core::Property ExecuteProcess::BatchDuration(
+    "Batch Duration",
+    "If the process is expected to be long-running and produce textual output, 
a batch duration can be specified.",
+    "0");
+core::Property ExecuteProcess::RedirectErrorStream(
+    "Redirect Error Stream",
+    "If true will redirect any error stream output of the process to the 
output stream.",
+    "false");
+core::Relationship ExecuteProcess::Success(
+    "success", "All created FlowFiles are routed to this relationship.");
+
+void ExecuteProcess::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(Command);
+  properties.insert(CommandArguments);
+  properties.insert(WorkingDir);
+  properties.insert(BatchDuration);
+  properties.insert(RedirectErrorStream);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+void ExecuteProcess::onTrigger(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+  std::string value;
+  if (context->getProperty(Command.getName(), value)) {
+    this->_command = value;
+  }
+  if (context->getProperty(CommandArguments.getName(), value)) {
+    this->_commandArgument = value;
+  }
+  if (context->getProperty(WorkingDir.getName(), value)) {
+    this->_workingDir = value;
+  }
+  if (context->getProperty(BatchDuration.getName(), value)) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value,
+                                                                _batchDuration,
+                                                                unit)
+        && core::Property::ConvertTimeUnitToMS(
+            _batchDuration, unit, _batchDuration)) {
+
+    }
+  }
+  if (context->getProperty(RedirectErrorStream.getName(), value)) {
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(
+        value, _redirectErrorStream);
+  }
+  this->_fullCommand = _command + " " + _commandArgument;
+  if (_fullCommand.length() == 0) {
+    yield();
+    return;
+  }
+  if (_workingDir.length() > 0 && _workingDir != ".") {
+    // change to working directory
+    if (chdir(_workingDir.c_str()) != 0) {
+      logger_->log_error("Execute Command can not chdir %s",
+                         _workingDir.c_str());
+      yield();
+      return;
+    }
+  }
+  logger_->log_info("Execute Command %s", _fullCommand.c_str());
+  // split the command into array
+  char cstr[_fullCommand.length() + 1];
+  std::strcpy(cstr, _fullCommand.c_str());
+  char *p = std::strtok(cstr, " ");
+  int argc = 0;
+  char *argv[64];
+  while (p != 0 && argc < 64) {
+    argv[argc] = p;
+    p = std::strtok(NULL, " ");
+    argc++;
+  }
+  argv[argc] = NULL;
+  int status, died;
+  if (!_processRunning) {
+    _processRunning = true;
+    // if the process has not launched yet
+    // create the pipe
+    if (pipe(_pipefd) == -1) {
+      _processRunning = false;
+      yield();
+      return;
+    }
+    switch (_pid = fork()) {
+      case -1:
+        logger_->log_error("Execute Process fork failed");
+        _processRunning = false;
+        close(_pipefd[0]);
+        close(_pipefd[1]);
+        yield();
+        break;
+      case 0:  // this is the code the child runs
+        close(1);      // close stdout
+        dup(_pipefd[1]);  // points pipefd at file descriptor
+        if (_redirectErrorStream)
+          // redirect stderr
+          dup2(_pipefd[1], 2);
+        close(_pipefd[0]);
+        execvp(argv[0], argv);
+        exit(1);
+        break;
+      default:  // this is the code the parent runs
+        // the parent isn't going to write to the pipe
+        close(_pipefd[1]);
+        if (_batchDuration > 0) {
+          while (1) {
+            std::this_thread::sleep_for(
+                std::chrono::milliseconds(_batchDuration));
+            char buffer[4096];
+            int numRead = read(_pipefd[0], buffer, sizeof(buffer));
+            if (numRead <= 0)
+              break;
+            logger_->log_info("Execute Command Respond %d", numRead);
+            ExecuteProcess::WriteCallback callback(buffer, numRead);
+            std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<
+                FlowFileRecord>(session->create());
+            if (!flowFile)
+              continue;
+            flowFile->addAttribute("command", _command.c_str());
+            flowFile->addAttribute("command.arguments",
+                                   _commandArgument.c_str());
+            session->write(flowFile, &callback);
+            session->transfer(flowFile, Success);
+            session->commit();
+          }
+        } else {
+          char buffer[4096];
+          char *bufPtr = buffer;
+          int totalRead = 0;
+          std::shared_ptr<FlowFileRecord> flowFile = nullptr;
+          while (1) {
+            int numRead = read(_pipefd[0], bufPtr,
+                               (sizeof(buffer) - totalRead));
+            if (numRead <= 0) {
+              if (totalRead > 0) {
+                logger_->log_info("Execute Command Respond %d", totalRead);
+                // child exits and close the pipe
+                ExecuteProcess::WriteCallback callback(buffer, totalRead);
+                if (!flowFile) {
+                  flowFile = std::static_pointer_cast<FlowFileRecord>(
+                      session->create());
+                  if (!flowFile)
+                    break;
+                  flowFile->addAttribute("command", _command.c_str());
+                  flowFile->addAttribute("command.arguments",
+                                         _commandArgument.c_str());
+                  session->write(flowFile, &callback);
+                } else {
+                  session->append(flowFile, &callback);
+                }
+                session->transfer(flowFile, Success);
+              }
+              break;
+            } else {
+              if (numRead == (sizeof(buffer) - totalRead)) {
+                // we reach the max buffer size
+                logger_->log_info("Execute Command Max Respond %d",
+                                  sizeof(buffer));
+                ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
+                if (!flowFile) {
+                  flowFile = std::static_pointer_cast<FlowFileRecord>(
+                      session->create());
+                  if (!flowFile)
+                    continue;
+                  flowFile->addAttribute("command", _command.c_str());
+                  flowFile->addAttribute("command.arguments",
+                                         _commandArgument.c_str());
+                  session->write(flowFile, &callback);
+                } else {
+                  session->append(flowFile, &callback);
+                }
+                // Rewind
+                totalRead = 0;
+                bufPtr = buffer;
+              } else {
+                totalRead += numRead;
+                bufPtr += numRead;
+              }
+            }
+          }
+        }
+
+        died = wait(&status);
+        if (WIFEXITED(status)) {
+          logger_->log_info("Execute Command Complete %s status %d pid %d",
+                            _fullCommand.c_str(), WEXITSTATUS(status), _pid);
+        } else {
+          logger_->log_info("Execute Command Complete %s status %d pid %d",
+                            _fullCommand.c_str(), WTERMSIG(status), _pid);
+        }
+
+        close(_pipefd[0]);
+        _processRunning = false;
+        break;
+    }
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GenerateFlowFile.cpp 
b/libminifi/src/processors/GenerateFlowFile.cpp
new file mode 100644
index 0000000..ebdaaa3
--- /dev/null
+++ b/libminifi/src/processors/GenerateFlowFile.cpp
@@ -0,0 +1,145 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include "utils/StringUtils.h"
+
+#include "processors/GenerateFlowFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+const char *GenerateFlowFile::DATA_FORMAT_BINARY = "Binary";
+const char *GenerateFlowFile::DATA_FORMAT_TEXT = "Text";
+const std::string GenerateFlowFile::ProcessorName("GenerateFlowFile");
+core::Property GenerateFlowFile::FileSize("File Size", "The size of the file 
that will be used", "1 kB");
+core::Property GenerateFlowFile::BatchSize("Batch Size", "The number of 
FlowFiles to be transferred in each invocation", "1");
+core::Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether 
the data should be Text or Binary", GenerateFlowFile::DATA_FORMAT_BINARY);
+core::Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles",
+               "If true, each FlowFile that is generated will be unique. If 
false, a random value will be generated and all FlowFiles", "true");
+core::Relationship GenerateFlowFile::Success("success", "success operational 
on the flow record");
+
+void GenerateFlowFile::initialize()
+{
+       // Set the supported properties
+       std::set<core::Property> properties;
+       properties.insert(FileSize);
+       properties.insert(BatchSize);
+       properties.insert(DataFormat);
+       properties.insert(UniqueFlowFiles);
+       setSupportedProperties(properties);
+       // Set the supported relationships
+       std::set<core::Relationship> relationships;
+       relationships.insert(Success);
+       setSupportedRelationships(relationships);
+}
+
+void GenerateFlowFile::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session)
+{
+       int64_t batchSize = 1;
+       bool uniqueFlowFile = true;
+       int64_t fileSize = 1024;
+
+       std::string value;
+       if (context->getProperty(FileSize.getName(), value))
+       {
+         core::Property::StringToInt(value, fileSize);
+       }
+       if (context->getProperty(BatchSize.getName(), value))
+       {
+         core::Property::StringToInt(value, batchSize);
+       }
+       if (context->getProperty(UniqueFlowFiles.getName(), value))
+       {
+               
org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
uniqueFlowFile);
+       }
+
+       if (!uniqueFlowFile)
+       {
+               char *data;
+               data = new char[fileSize];
+               if (!data)
+                       return;
+               uint64_t dataSize = fileSize;
+               GenerateFlowFile::WriteCallback callback(data, dataSize);
+               char *current = data;
+               for (int i = 0; i < fileSize; i+= sizeof(int))
+               {
+                       int randValue = random();
+                       *((int *) current) = randValue;
+                       current += sizeof(int);
+               }
+               for (int i = 0; i < batchSize; i++)
+               {
+                       // For each batch
+                       std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
+                       if (!flowFile)
+                               return;
+                       if (fileSize > 0)
+                               session->write(flowFile, &callback);
+                       session->transfer(flowFile, Success);
+               }
+               delete[] data;
+       }
+       else
+       {
+               if (!_data)
+               {
+                       // We have not create the unique data yet
+                       _data = new char[fileSize];
+                       _dataSize = fileSize;
+                       char *current = _data;
+                       for (int i = 0; i < fileSize; i+= sizeof(int))
+                       {
+                               int randValue = random();
+                               *((int *) current) = randValue;
+                               // *((int *) current) = (0xFFFFFFFF & i);
+                               current += sizeof(int);
+                       }
+               }
+               GenerateFlowFile::WriteCallback callback(_data, _dataSize);
+               for (int i = 0; i < batchSize; i++)
+               {
+                       // For each batch
+                       std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
+                       if (!flowFile)
+                               return;
+                       if (fileSize > 0)
+                               session->write(flowFile, &callback);
+                       session->transfer(flowFile, Success);
+               }
+       }
+}
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp 
b/libminifi/src/processors/GetFile.cpp
new file mode 100644
index 0000000..cf05657
--- /dev/null
+++ b/libminifi/src/processors/GetFile.cpp
@@ -0,0 +1,340 @@
+/**
+ * @file GetFile.cpp
+ * GetFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <dirent.h>
+#include <limits.h>
+#include <unistd.h>
+#if  (__GNUC__ >= 4) 
+#if (__GNUC_MINOR__ < 9)
+#include <regex.h>
+#endif
+#endif
+#include "utils/StringUtils.h"
+#include <regex>
+#include "utils/TimeUtil.h"
+#include "processors/GetFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+const std::string GetFile::ProcessorName("GetFile");
+core::Property GetFile::BatchSize(
+    "Batch Size", "The maximum number of files to pull in each iteration",
+    "10");
+core::Property GetFile::Directory(
+    "Input Directory", "The input directory from which to pull files", ".");
+core::Property GetFile::IgnoreHiddenFile(
+    "Ignore Hidden Files",
+    "Indicates whether or not hidden files should be ignored", "true");
+core::Property GetFile::KeepSourceFile(
+    "Keep Source File",
+    "If true, the file is not deleted after it has been copied to the Content 
Repository",
+    "false");
+core::Property GetFile::MaxAge(
+    "Maximum File Age",
+    "The minimum age that a file must be in order to be pulled; any file 
younger than this amount of time (according to last modification date) will be 
ignored",
+    "0 sec");
+core::Property GetFile::MinAge(
+    "Minimum File Age",
+    "The maximum age that a file must be in order to be pulled; any file older 
than this amount of time (according to last modification date) will be ignored",
+    "0 sec");
+core::Property GetFile::MaxSize(
+    "Maximum File Size",
+    "The maximum size that a file can be in order to be pulled", "0 B");
+core::Property GetFile::MinSize(
+    "Minimum File Size",
+    "The minimum size that a file must be in order to be pulled", "0 B");
+core::Property GetFile::PollInterval(
+    "Polling Interval",
+    "Indicates how long to wait before performing a directory listing",
+    "0 sec");
+core::Property GetFile::Recurse(
+    "Recurse Subdirectories",
+    "Indicates whether or not to pull files from subdirectories", "true");
+core::Property GetFile::FileFilter(
+    "File Filter",
+    "Only files whose names match the given regular expression will be picked 
up",
+    "[^\\.].*");
+core::Relationship GetFile::Success(
+    "success", "All files are routed to success");
+
+void GetFile::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(BatchSize);
+  properties.insert(Directory);
+  properties.insert(IgnoreHiddenFile);
+  properties.insert(KeepSourceFile);
+  properties.insert(MaxAge);
+  properties.insert(MinAge);
+  properties.insert(MaxSize);
+  properties.insert(MinSize);
+  properties.insert(PollInterval);
+  properties.insert(Recurse);
+  properties.insert(FileFilter);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+void GetFile::onTrigger(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+  std::string value;
+
+  logger_->log_info("onTrigger GetFile");
+  if (context->getProperty(Directory.getName(), value)) {
+    _directory = value;
+  }
+  if (context->getProperty(BatchSize.getName(), value)) {
+    core::Property::StringToInt(value, _batchSize);
+  }
+  if (context->getProperty(IgnoreHiddenFile.getName(), value)) {
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(
+        value, _ignoreHiddenFile);
+  }
+  if (context->getProperty(KeepSourceFile.getName(), value)) {
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(
+        value, _keepSourceFile);
+  }
+
+  logger_->log_info("onTrigger GetFile");
+  if (context->getProperty(MaxAge.getName(), value)) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value, _maxAge,
+                                                                unit)
+        && core::Property::ConvertTimeUnitToMS(
+            _maxAge, unit, _maxAge)) {
+
+    }
+  }
+  if (context->getProperty(MinAge.getName(), value)) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value, _minAge,
+                                                                unit)
+        && core::Property::ConvertTimeUnitToMS(
+            _minAge, unit, _minAge)) {
+
+    }
+  }
+  if (context->getProperty(MaxSize.getName(), value)) {
+    core::Property::StringToInt(value, _maxSize);
+  }
+  if (context->getProperty(MinSize.getName(), value)) {
+    core::Property::StringToInt(value, _minSize);
+  }
+  if (context->getProperty(PollInterval.getName(), value)) {
+    core::TimeUnit unit;
+    if (core::Property::StringToTime(value,
+                                                                _pollInterval,
+                                                                unit)
+        && core::Property::ConvertTimeUnitToMS(
+            _pollInterval, unit, _pollInterval)) {
+
+    }
+  }
+  if (context->getProperty(Recurse.getName(), value)) {
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
+                                                                _recursive);
+  }
+
+  if (context->getProperty(FileFilter.getName(), value)) {
+    _fileFilter = value;
+  }
+
+  // Perform directory list
+  logger_->log_info("Is listing empty %i", isListingEmpty());
+  if (isListingEmpty()) {
+    if (_pollInterval == 0
+        || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval) {
+      performListing(_directory);
+    }
+  }
+  logger_->log_info("Is listing empty %i", isListingEmpty());
+
+  if (!isListingEmpty()) {
+    try {
+      std::queue<std::string> list;
+      pollListing(list, _batchSize);
+      while (!list.empty()) {
+
+        std::string fileName = list.front();
+        list.pop();
+        logger_->log_info("GetFile process %s", fileName.c_str());
+        std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
+            FlowFileRecord>(session->create());
+        if (flowFile == nullptr)
+          return;
+        std::size_t found = fileName.find_last_of("/\\");
+        std::string path = fileName.substr(0, found);
+        std::string name = fileName.substr(found + 1);
+        flowFile->updateKeyedAttribute(FILENAME, name);
+        flowFile->updateKeyedAttribute(PATH, path);
+        flowFile->addKeyedAttribute(ABSOLUTE_PATH, fileName);
+        session->import(fileName, flowFile, _keepSourceFile);
+        session->transfer(flowFile, Success);
+      }
+    } catch (std::exception &exception) {
+      logger_->log_debug("GetFile Caught Exception %s", exception.what());
+      throw;
+    } catch (...) {
+      throw;
+    }
+  }
+
+}
+
+bool GetFile::isListingEmpty() {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  return _dirList.empty();
+}
+
+void GetFile::putListing(std::string fileName) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  _dirList.push(fileName);
+}
+
+void GetFile::pollListing(std::queue<std::string> &list, int maxSize) {
+  std::lock_guard<std::mutex> lock(mutex_);
+
+  while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize)) {
+    std::string fileName = _dirList.front();
+    _dirList.pop();
+    list.push(fileName);
+  }
+
+  return;
+}
+
+bool GetFile::acceptFile(std::string fullName, std::string name) {
+  struct stat statbuf;
+
+  if (stat(fullName.c_str(), &statbuf) == 0) {
+    if (_minSize > 0 && statbuf.st_size < _minSize)
+      return false;
+
+    if (_maxSize > 0 && statbuf.st_size > _maxSize)
+      return false;
+
+    uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
+    uint64_t fileAge = getTimeMillis() - modifiedTime;
+    if (_minAge > 0 && fileAge < _minAge)
+      return false;
+    if (_maxAge > 0 && fileAge > _maxAge)
+      return false;
+
+    if (_ignoreHiddenFile && fullName.c_str()[0] == '.')
+      return false;
+
+    if (access(fullName.c_str(), R_OK) != 0)
+      return false;
+
+    if (_keepSourceFile == false && access(fullName.c_str(), W_OK) != 0)
+      return false;
+
+#ifdef __GNUC__
+#if (__GNUC__ >= 4)
+#if (__GNUC_MINOR__ < 9)
+    regex_t regex;
+    int ret = regcomp(&regex, _fileFilter.c_str(), 0);
+    if (ret)
+      return false;
+    ret = regexec(&regex, name.c_str(), (size_t) 0, NULL, 0);
+    regfree(&regex);
+    if (ret)
+      return false;
+#else
+    try {
+      std::regex re(_fileFilter);
+
+      if (!std::regex_match(name, re)) {
+        return false;
+      }
+    } catch (std::regex_error e) {
+      logger_->log_error("Invalid File Filter regex: %s.", e.what());
+      return false;
+    }
+#endif
+#endif
+#else
+    logger_->log_info("Cannot support regex filtering");
+#endif
+    return true;
+  }
+
+  return false;
+}
+
+void GetFile::performListing(std::string dir) {
+  logger_->log_info("Performing file listing against %s", dir.c_str());
+  DIR *d;
+  d = opendir(dir.c_str());
+  if (!d)
+    return;
+  // only perform a listing while we are not empty
+  logger_->log_info("Performing file listing against %s", dir.c_str());
+  while (isRunning()) {
+    struct dirent *entry;
+    entry = readdir(d);
+    if (!entry)
+      break;
+    std::string d_name = entry->d_name;
+    if ((entry->d_type & DT_DIR)) {
+      // if this is a directory
+      if (_recursive && strcmp(d_name.c_str(), "..") != 0
+          && strcmp(d_name.c_str(), ".") != 0) {
+        std::string path = dir + "/" + d_name;
+        performListing(path);
+      }
+    } else {
+      std::string fileName = dir + "/" + d_name;
+      if (acceptFile(fileName, d_name)) {
+        // check whether we can take this file
+        putListing(fileName);
+      }
+    }
+  }
+  closedir(d);
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenHTTP.cpp 
b/libminifi/src/processors/ListenHTTP.cpp
new file mode 100644
index 0000000..36c743e
--- /dev/null
+++ b/libminifi/src/processors/ListenHTTP.cpp
@@ -0,0 +1,380 @@
+/**
+ * @file ListenHTTP.cpp
+ * ListenHTTP class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <fstream>
+#include <uuid/uuid.h>
+
+#include <CivetServer.h>
+
+#include "processors/ListenHTTP.h"
+
+#include "utils/TimeUtil.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/ProcessSessionFactory.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string ListenHTTP::ProcessorName("ListenHTTP");
+
+core::Property ListenHTTP::BasePath(
+    "Base Path", "Base path for incoming connections", "contentListener");
+core::Property ListenHTTP::Port(
+    "Listening Port", "The Port to listen on for incoming connections", "");
+core::Property ListenHTTP::AuthorizedDNPattern(
+    "Authorized DN Pattern",
+    "A Regular Expression to apply against the Distinguished Name of incoming 
connections. If the Pattern does not match the DN, the connection will be 
refused.",
+    ".*");
+core::Property ListenHTTP::SSLCertificate(
+    "SSL Certificate",
+    "File containing PEM-formatted file including TLS/SSL certificate and key",
+    "");
+core::Property ListenHTTP::SSLCertificateAuthority(
+    "SSL Certificate Authority",
+    "File containing trusted PEM-formatted certificates", "");
+core::Property ListenHTTP::SSLVerifyPeer(
+    "SSL Verify Peer",
+    "Whether or not to verify the client's certificate (yes/no)", "no");
+core::Property ListenHTTP::SSLMinimumVersion(
+    "SSL Minimum Version",
+    "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)",
+    "SSL2");
+core::Property ListenHTTP::HeadersAsAttributesRegex(
+    "HTTP Headers to receive as Attributes (Regex)",
+    "Specifies the Regular Expression that determines the names of HTTP 
Headers that should be passed along as FlowFile attributes",
+    "");
+
+core::Relationship ListenHTTP::Success(
+    "success", "All files are routed to success");
+
+void ListenHTTP::initialize() {
+  _logger->log_info("Initializing ListenHTTP");
+
+  // Set the supported properties
+  std::set < core::Property > properties;
+  properties.insert(BasePath);
+  properties.insert(Port);
+  properties.insert(AuthorizedDNPattern);
+  properties.insert(SSLCertificate);
+  properties.insert(SSLCertificateAuthority);
+  properties.insert(SSLVerifyPeer);
+  properties.insert(SSLMinimumVersion);
+  properties.insert(HeadersAsAttributesRegex);
+  setSupportedProperties (properties);
+  // Set the supported relationships
+  std::set < core::Relationship > relationships;
+  relationships.insert(Success);
+  setSupportedRelationships (relationships);
+}
+
+void ListenHTTP::onSchedule(
+    core::ProcessContext *context,
+    core::ProcessSessionFactory *sessionFactory) {
+
+  std::string basePath;
+
+  if (!context->getProperty(BasePath.getName(), basePath)) {
+    _logger->log_info(
+        "%s attribute is missing, so default value of %s will be used",
+        BasePath.getName().c_str(), BasePath.getValue().c_str());
+    basePath = BasePath.getValue();
+  }
+
+  basePath.insert(0, "/");
+
+  std::string listeningPort;
+
+  if (!context->getProperty(Port.getName(), listeningPort)) {
+    _logger->log_error("%s attribute is missing or invalid",
+                       Port.getName().c_str());
+    return;
+  }
+
+  std::string authDNPattern;
+
+  if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern)
+      && !authDNPattern.empty()) {
+    _logger->log_info("ListenHTTP using %s: %s",
+                      AuthorizedDNPattern.getName().c_str(),
+                      authDNPattern.c_str());
+  }
+
+  std::string sslCertFile;
+
+  if (context->getProperty(SSLCertificate.getName(), sslCertFile)
+      && !sslCertFile.empty()) {
+    _logger->log_info("ListenHTTP using %s: %s",
+                      SSLCertificate.getName().c_str(), sslCertFile.c_str());
+  }
+
+  // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue 
of certificate value being set
+  std::string sslCertAuthorityFile;
+  std::string sslVerifyPeer;
+  std::string sslMinVer;
+
+  if (!sslCertFile.empty()) {
+    if (context->getProperty(SSLCertificateAuthority.getName(),
+                             sslCertAuthorityFile)
+        && !sslCertAuthorityFile.empty()) {
+      _logger->log_info("ListenHTTP using %s: %s",
+                        SSLCertificateAuthority.getName().c_str(),
+                        sslCertAuthorityFile.c_str());
+    }
+
+    if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) {
+      if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
+        _logger->log_info("ListenHTTP will not verify peers");
+      } else {
+        _logger->log_info("ListenHTTP will verify peers");
+      }
+    } else {
+      _logger->log_info("ListenHTTP will not verify peers");
+    }
+
+    if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) {
+      _logger->log_info("ListenHTTP using %s: %s",
+                        SSLMinimumVersion.getName().c_str(), 
sslMinVer.c_str());
+    }
+  }
+
+  std::string headersAsAttributesPattern;
+
+  if (context->getProperty(HeadersAsAttributesRegex.getName(),
+                           headersAsAttributesPattern)
+      && !headersAsAttributesPattern.empty()) {
+    _logger->log_info("ListenHTTP using %s: %s",
+                      HeadersAsAttributesRegex.getName().c_str(),
+                      headersAsAttributesPattern.c_str());
+  }
+
+  auto numThreads = getMaxConcurrentTasks();
+
+  _logger->log_info(
+      "ListenHTTP starting HTTP server on port %s and path %s with %d threads",
+      listeningPort.c_str(), basePath.c_str(), numThreads);
+
+  // Initialize web server
+  std::vector < std::string > options;
+  options.push_back("enable_keep_alive");
+  options.push_back("yes");
+  options.push_back("keep_alive_timeout_ms");
+  options.push_back("15000");
+  options.push_back("num_threads");
+  options.push_back(std::to_string(numThreads));
+
+  if (sslCertFile.empty()) {
+    options.push_back("listening_ports");
+    options.push_back(listeningPort);
+  } else {
+    listeningPort += "s";
+    options.push_back("listening_ports");
+    options.push_back(listeningPort);
+
+    options.push_back("ssl_certificate");
+    options.push_back(sslCertFile);
+
+    if (!sslCertAuthorityFile.empty()) {
+      options.push_back("ssl_ca_file");
+      options.push_back(sslCertAuthorityFile);
+    }
+
+    if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
+      options.push_back("ssl_verify_peer");
+      options.push_back("no");
+    } else {
+      options.push_back("ssl_verify_peer");
+      options.push_back("yes");
+    }
+
+    if (sslMinVer.compare("SSL2") == 0) {
+      options.push_back("ssl_protocol_version");
+      options.push_back(std::to_string(0));
+    } else if (sslMinVer.compare("SSL3") == 0) {
+      options.push_back("ssl_protocol_version");
+      options.push_back(std::to_string(1));
+    } else if (sslMinVer.compare("TLS1.0") == 0) {
+      options.push_back("ssl_protocol_version");
+      options.push_back(std::to_string(2));
+    } else if (sslMinVer.compare("TLS1.1") == 0) {
+      options.push_back("ssl_protocol_version");
+      options.push_back(std::to_string(3));
+    } else {
+      options.push_back("ssl_protocol_version");
+      options.push_back(std::to_string(4));
+    }
+  }
+
+  _server.reset(new CivetServer(options));
+  _handler.reset(
+      new Handler(context, sessionFactory, std::move(authDNPattern),
+                  std::move(headersAsAttributesPattern)));
+  _server->addHandler(basePath, _handler.get());
+}
+
+ListenHTTP::~ListenHTTP() {
+}
+
+void ListenHTTP::onTrigger(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+
+  std::shared_ptr < FlowFileRecord > flowFile = std::static_pointer_cast
+      < FlowFileRecord > (session->get());
+
+  // Do nothing if there are no incoming files
+  if (!flowFile) {
+    return;
+  }
+}
+
+ListenHTTP::Handler::Handler(
+    core::ProcessContext *context,
+    core::ProcessSessionFactory *sessionFactory,
+    std::string &&authDNPattern, std::string &&headersAsAttributesPattern)
+    : _authDNRegex(std::move(authDNPattern)),
+      _headersAsAttributesRegex(std::move(headersAsAttributesPattern)) {
+  _processContext = context;
+  _processSessionFactory = sessionFactory;
+}
+
+void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) {
+  mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"
+            "Content-Type: text/html\r\n"
+            "Content-Length: 0\r\n\r\n");
+}
+
+bool ListenHTTP::Handler::handlePost(CivetServer *server,
+                                     struct mg_connection *conn) {
+  _logger = logging::Logger::getLogger();
+
+  auto req_info = mg_get_request_info(conn);
+  _logger->log_info("ListenHTTP handling POST request of length %d",
+                    req_info->content_length);
+
+  // If this is a two-way TLS connection, authorize the peer against the 
configured pattern
+  if (req_info->is_ssl && req_info->client_cert != nullptr) {
+    if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) {
+      mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n"
+                "Content-Type: text/html\r\n"
+                "Content-Length: 0\r\n\r\n");
+      _logger->log_warn("ListenHTTP client DN not authorized: %s",
+                        req_info->client_cert->subject);
+      return true;
+    }
+  }
+
+  // Always send 100 Continue, as allowed per standard to minimize client 
delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
+  mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
+
+  auto session = _processSessionFactory->createSession();
+  ListenHTTP::WriteCallback callback(conn, req_info);
+  auto flowFile = std::static_pointer_cast < FlowFileRecord
+      > (session->create());
+
+  if (!flowFile) {
+    sendErrorResponse(conn);
+    return true;
+  }
+
+  try {
+    session->write(flowFile, &callback);
+
+    // Add filename from "filename" header value (and pattern headers)
+    for (int i = 0; i < req_info->num_headers; i++) {
+      auto header = &req_info->http_headers[i];
+
+      if (strcmp("filename", header->name) == 0) {
+        if (!flowFile->updateAttribute("filename", header->value)) {
+          flowFile->addAttribute("filename", header->value);
+        }
+      } else if (std::regex_match(header->name, _headersAsAttributesRegex)) {
+        if (!flowFile->updateAttribute(header->name, header->value)) {
+          flowFile->addAttribute(header->name, header->value);
+        }
+      }
+    }
+
+    session->transfer(flowFile, Success);
+    session->commit();
+  } catch (std::exception &exception) {
+    _logger->log_debug("ListenHTTP Caught Exception %s", exception.what());
+    sendErrorResponse(conn);
+    session->rollback();
+    throw;
+  } catch (...) {
+    _logger->log_debug("ListenHTTP Caught Exception Processor::onTrigger");
+    sendErrorResponse(conn);
+    session->rollback();
+    throw;
+  }
+
+  mg_printf(conn, "HTTP/1.1 200 OK\r\n"
+            "Content-Type: text/html\r\n"
+            "Content-Length: 0\r\n\r\n");
+
+  return true;
+}
+
+ListenHTTP::WriteCallback::WriteCallback(
+    struct mg_connection *conn, const struct mg_request_info *reqInfo) {
+  _logger = logging::Logger::getLogger();
+  _conn = conn;
+  _reqInfo = reqInfo;
+}
+
+void ListenHTTP::WriteCallback::process(std::ofstream *stream) {
+  long long rlen;
+  long long nlen = 0;
+  long long tlen = _reqInfo->content_length;
+  char buf[16384];
+
+  while (nlen < tlen) {
+    rlen = tlen - nlen;
+
+    if (rlen > sizeof(buf)) {
+      rlen = sizeof(buf);
+    }
+
+    // Read a buffer of data from client
+    rlen = mg_read(_conn, &buf[0], (size_t) rlen);
+
+    if (rlen <= 0) {
+      break;
+    }
+
+    // Transfer buffer data to the output stream
+    stream->write(&buf[0], rlen);
+
+    nlen += rlen;
+  }
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/ListenSyslog.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ListenSyslog.cpp 
b/libminifi/src/processors/ListenSyslog.cpp
new file mode 100644
index 0000000..2dd223c
--- /dev/null
+++ b/libminifi/src/processors/ListenSyslog.cpp
@@ -0,0 +1,331 @@
+/**
+ * @file ListenSyslog.cpp
+ * ListenSyslog class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <queue>
+#include <stdio.h>
+#include <string>
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "processors/ListenSyslog.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string ListenSyslog::ProcessorName("ListenSyslog");
+core::Property ListenSyslog::RecvBufSize(
+    "Receive Buffer Size",
+    "The size of each buffer used to receive Syslog messages.", "65507 B");
+core::Property ListenSyslog::MaxSocketBufSize(
+    "Max Size of Socket Buffer",
+    "The maximum size of the socket buffer that should be used.", "1 MB");
+core::Property ListenSyslog::MaxConnections(
+    "Max Number of TCP Connections",
+    "The maximum number of concurrent connections to accept Syslog messages in 
TCP mode.",
+    "2");
+core::Property ListenSyslog::MaxBatchSize(
+    "Max Batch Size",
+    "The maximum number of Syslog events to add to a single FlowFile.", "1");
+core::Property ListenSyslog::MessageDelimiter(
+    "Message Delimiter",
+    "Specifies the delimiter to place between Syslog messages when multiple 
messages are bundled together (see <Max Batch Size> core::Property).",
+    "\n");
+core::Property ListenSyslog::ParseMessages(
+    "Parse Messages",
+    "Indicates if the processor should parse the Syslog messages. If set to 
false, each outgoing FlowFile will only.",
+    "false");
+core::Property ListenSyslog::Protocol(
+    "Protocol", "The protocol for Syslog communication.", "UDP");
+core::Property ListenSyslog::Port(
+    "Port", "The port for Syslog communication.", "514");
+core::Relationship ListenSyslog::Success(
+    "success", "All files are routed to success");
+core::Relationship ListenSyslog::Invalid(
+    "invalid", "SysLog message format invalid");
+
+void ListenSyslog::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(RecvBufSize);
+  properties.insert(MaxSocketBufSize);
+  properties.insert(MaxConnections);
+  properties.insert(MaxBatchSize);
+  properties.insert(MessageDelimiter);
+  properties.insert(ParseMessages);
+  properties.insert(Protocol);
+  properties.insert(Port);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(Invalid);
+  setSupportedRelationships(relationships);
+}
+
+void ListenSyslog::startSocketThread() {
+  if (_thread != NULL)
+    return;
+
+  logger_->log_info("ListenSysLog Socket Thread Start");
+  _serverTheadRunning = true;
+  _thread = new std::thread(run, this);
+  _thread->detach();
+}
+
+void ListenSyslog::run(ListenSyslog *process) {
+  process->runThread();
+}
+
+void ListenSyslog::runThread() {
+  while (_serverTheadRunning) {
+    if (_resetServerSocket) {
+      _resetServerSocket = false;
+      // need to reset the socket
+      std::vector<int>::iterator it;
+      for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
+        int clientSocket = *it;
+        close(clientSocket);
+      }
+      _clientSockets.clear();
+      if (_serverSocket > 0) {
+        close(_serverSocket);
+        _serverSocket = 0;
+      }
+    }
+
+    if (_serverSocket <= 0) {
+      uint16_t portno = _port;
+      struct sockaddr_in serv_addr;
+      int sockfd;
+      if (_protocol == "TCP")
+        sockfd = socket(AF_INET, SOCK_STREAM, 0);
+      else
+        sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+      if (sockfd < 0) {
+        logger_->log_info("ListenSysLog Server socket creation failed");
+        break;
+      }
+      bzero((char *) &serv_addr, sizeof(serv_addr));
+      serv_addr.sin_family = AF_INET;
+      serv_addr.sin_addr.s_addr = INADDR_ANY;
+      serv_addr.sin_port = htons(portno);
+      if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) 
{
+        logger_->log_error("ListenSysLog Server socket bind failed");
+        break;
+      }
+      if (_protocol == "TCP")
+        listen(sockfd, 5);
+      _serverSocket = sockfd;
+      logger_->log_error("ListenSysLog Server socket %d bind OK to port %d",
+                         _serverSocket, portno);
+    }
+    FD_ZERO(&_readfds);
+    FD_SET(_serverSocket, &_readfds);
+    _maxFds = _serverSocket;
+    std::vector<int>::iterator it;
+    for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
+      int clientSocket = *it;
+      if (clientSocket >= _maxFds)
+        _maxFds = clientSocket;
+      FD_SET(clientSocket, &_readfds);
+    }
+    fd_set fds;
+    struct timeval tv;
+    int retval;
+    fds = _readfds;
+    tv.tv_sec = 0;
+    // 100 msec
+    tv.tv_usec = 100000;
+    retval = select(_maxFds + 1, &fds, NULL, NULL, &tv);
+    if (retval < 0)
+      break;
+    if (retval == 0)
+      continue;
+    if (FD_ISSET(_serverSocket, &fds)) {
+      // server socket, either we have UDP datagram or TCP connection request
+      if (_protocol == "TCP") {
+        socklen_t clilen;
+        struct sockaddr_in cli_addr;
+        clilen = sizeof(cli_addr);
+        int newsockfd = accept(_serverSocket, (struct sockaddr *) &cli_addr,
+                               &clilen);
+        if (newsockfd > 0) {
+          if (_clientSockets.size() < _maxConnections) {
+            _clientSockets.push_back(newsockfd);
+            logger_->log_info("ListenSysLog new client socket %d connection",
+                              newsockfd);
+            continue;
+          } else {
+            close(newsockfd);
+          }
+        }
+      } else {
+        socklen_t clilen;
+        struct sockaddr_in cli_addr;
+        clilen = sizeof(cli_addr);
+        int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0,
+                               (struct sockaddr *) &cli_addr, &clilen);
+        if (recvlen > 0
+            && (recvlen + getEventQueueByteSize()) <= _recvBufSize) {
+          uint8_t *payload = new uint8_t[recvlen];
+          memcpy(payload, _buffer, recvlen);
+          putEvent(payload, recvlen);
+        }
+      }
+    }
+    it = _clientSockets.begin();
+    while (it != _clientSockets.end()) {
+      int clientSocket = *it;
+      if (FD_ISSET(clientSocket, &fds)) {
+        int recvlen = readline(clientSocket, (char *) _buffer, 
sizeof(_buffer));
+        if (recvlen <= 0) {
+          close(clientSocket);
+          logger_->log_info("ListenSysLog client socket %d close",
+                            clientSocket);
+          it = _clientSockets.erase(it);
+        } else {
+          if ((recvlen + getEventQueueByteSize()) <= _recvBufSize) {
+            uint8_t *payload = new uint8_t[recvlen];
+            memcpy(payload, _buffer, recvlen);
+            putEvent(payload, recvlen);
+          }
+          ++it;
+        }
+      }
+    }
+  }
+  return;
+}
+
+int ListenSyslog::readline(int fd, char *bufptr, size_t len) {
+  char *bufx = bufptr;
+  static char *bp;
+  static int cnt = 0;
+  static char b[2048];
+  char c;
+
+  while (--len > 0) {
+    if (--cnt <= 0) {
+      cnt = recv(fd, b, sizeof(b), 0);
+      if (cnt < 0) {
+        if ( errno == EINTR) {
+          len++; /* the while will decrement */
+          continue;
+        }
+        return -1;
+      }
+      if (cnt == 0)
+        return 0;
+      bp = b;
+    }
+    c = *bp++;
+    *bufptr++ = c;
+    if (c == '\n') {
+      *bufptr = '\n';
+      return bufptr - bufx + 1;
+    }
+  }
+  return -1;
+}
+
+void ListenSyslog::onTrigger(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+  std::string value;
+  bool needResetServerSocket = false;
+  if (context->getProperty(Protocol.getName(), value)) {
+    if (_protocol != value)
+      needResetServerSocket = true;
+    _protocol = value;
+  }
+  if (context->getProperty(RecvBufSize.getName(), value)) {
+    core::Property::StringToInt(value, _recvBufSize);
+  }
+  if (context->getProperty(MaxSocketBufSize.getName(), value)) {
+    core::Property::StringToInt(value,
+                                                           _maxSocketBufSize);
+  }
+  if (context->getProperty(MaxConnections.getName(), value)) {
+    core::Property::StringToInt(value,
+                                                           _maxConnections);
+  }
+  if (context->getProperty(MessageDelimiter.getName(), value)) {
+    _messageDelimiter = value;
+  }
+  if (context->getProperty(ParseMessages.getName(), value)) {
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value,
+                                                                
_parseMessages);
+  }
+  if (context->getProperty(Port.getName(), value)) {
+    int64_t oldPort = _port;
+    core::Property::StringToInt(value, _port);
+    if (_port != oldPort)
+      needResetServerSocket = true;
+  }
+  if (context->getProperty(MaxBatchSize.getName(), value)) {
+    core::Property::StringToInt(value,
+                                                           _maxBatchSize);
+  }
+
+  if (needResetServerSocket)
+    _resetServerSocket = true;
+
+  startSocketThread();
+
+  // read from the event queue
+  if (isEventQueueEmpty()) {
+    context->yield();
+    return;
+  }
+
+  std::queue<SysLogEvent> eventQueue;
+  pollEvent(eventQueue, _maxBatchSize);
+  bool firstEvent = true;
+  std::shared_ptr<FlowFileRecord> flowFile = NULL;
+  while (!eventQueue.empty()) {
+    SysLogEvent event = eventQueue.front();
+    eventQueue.pop();
+    if (firstEvent) {
+      flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+      if (!flowFile)
+        return;
+      ListenSyslog::WriteCallback callback((char *) event.payload, event.len);
+      session->write(flowFile, &callback);
+      delete[] event.payload;
+      firstEvent = false;
+    } else {
+      ListenSyslog::WriteCallback callback((char *) event.payload, event.len);
+      session->append(flowFile, &callback);
+      delete[] event.payload;
+    }
+  }
+  flowFile->addAttribute("syslog.protocol", _protocol);
+  flowFile->addAttribute("syslog.port", std::to_string(_port));
+  session->transfer(flowFile, Success);
+}
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/LogAttribute.cpp 
b/libminifi/src/processors/LogAttribute.cpp
new file mode 100644
index 0000000..e2cf16c
--- /dev/null
+++ b/libminifi/src/processors/LogAttribute.cpp
@@ -0,0 +1,176 @@
+/**
+ * @file LogAttribute.cpp
+ * LogAttribute class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <sstream>
+#include <string.h>
+#include <iostream>
+
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "processors/LogAttribute.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+const std::string LogAttribute::ProcessorName("LogAttribute");
+core::Property LogAttribute::LogLevel(
+    "Log Level", "The Log Level to use when logging the Attributes", "info");
+core::Property LogAttribute::AttributesToLog(
+    "Attributes to Log",
+    "A comma-separated list of Attributes to Log. If not specified, all 
attributes will be logged.",
+    "");
+core::Property LogAttribute::AttributesToIgnore(
+    "Attributes to Ignore",
+    "A comma-separated list of Attributes to ignore. If not specified, no 
attributes will be ignored.",
+    "");
+core::Property LogAttribute::LogPayload(
+    "Log Payload",
+    "If true, the FlowFile's payload will be logged, in addition to its 
attributes; otherwise, just the Attributes will be logged.",
+    "false");
+core::Property LogAttribute::LogPrefix(
+    "Log prefix",
+    "Log prefix appended to the log lines. It helps to distinguish the output 
of multiple LogAttribute processors.",
+    "");
+core::Relationship LogAttribute::Success(
+    "success", "success operational on the flow record");
+
+void LogAttribute::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(LogLevel);
+  properties.insert(AttributesToLog);
+  properties.insert(AttributesToIgnore);
+  properties.insert(LogPayload);
+  properties.insert(LogPrefix);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+}
+
+void LogAttribute::onTrigger(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+  std::string dashLine = "--------------------------------------------------";
+  LogAttrLevel level = LogAttrLevelInfo;
+  bool logPayload = false;
+  std::ostringstream message;
+
+  std::shared_ptr<core::FlowFile> flow =
+      session->get();
+
+  if (!flow)
+    return;
+
+  std::string value;
+  if (context->getProperty(LogLevel.getName(), value)) {
+    logLevelStringToEnum(value, level);
+  }
+  if (context->getProperty(LogPrefix.getName(), value)) {
+    dashLine = "-----" + value + "-----";
+  }
+  if (context->getProperty(LogPayload.getName(), value)) {
+    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
logPayload);
+  }
+
+  message << "Logging for flow file " << "\n";
+  message << dashLine;
+  message << "\nStandard FlowFile Attributes";
+  message << "\n" << "UUID:" << flow->getUUIDStr();
+  message << "\n" << "EntryDate:" << getTimeStr(flow->getEntryDate());
+  message << "\n" << "lineageStartDate:"
+          << getTimeStr(flow->getlineageStartDate());
+  message << "\n" << "Size:" << flow->getSize() << " Offset:"
+          << flow->getOffset();
+  message << "\nFlowFile Attributes Map Content";
+  std::map<std::string, std::string> attrs = flow->getAttributes();
+  std::map<std::string, std::string>::iterator it;
+  for (it = attrs.begin(); it != attrs.end(); it++) {
+    message << "\n" << "key:" << it->first << " value:" << it->second;
+  }
+  message << "\nFlowFile Resource Claim Content";
+  std::shared_ptr<ResourceClaim> claim = flow->getResourceClaim();
+  if (claim) {
+    message << "\n" << "Content Claim:" << claim->getContentFullPath();
+  }
+  if (logPayload && flow->getSize() <= 1024 * 1024) {
+    message << "\n" << "Payload:" << "\n";
+    ReadCallback callback(flow->getSize());
+    session->read(flow, &callback);
+    for (unsigned int i = 0, j = 0; i < callback._readSize; i++) {
+      char temp[8];
+      sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
+      message << temp;
+      j++;
+      if (j == 16) {
+        message << '\n';
+        j = 0;
+      }
+    }
+  }
+  message << "\n" << dashLine << std::ends;
+  std::string output = message.str();
+
+  switch (level) {
+    case LogAttrLevelInfo:
+      logger_->log_info("%s", output.c_str());
+      break;
+    case LogAttrLevelDebug:
+      logger_->log_debug("%s", output.c_str());
+      break;
+    case LogAttrLevelError:
+      logger_->log_error("%s", output.c_str());
+      break;
+    case LogAttrLevelTrace:
+      logger_->log_trace("%s", output.c_str());
+      break;
+    case LogAttrLevelWarn:
+      logger_->log_warn("%s", output.c_str());
+      break;
+    default:
+      break;
+  }
+
+  // Test Import
+  /*
+   std::shared_ptr<FlowFileRecord> importRecord = session->create();
+   session->import(claim->getContentFullPath(), importRecord);
+   session->transfer(importRecord, Success); */
+
+  // Transfer to the relationship
+  session->transfer(flow, Success);
+}
+
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/PutFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/PutFile.cpp 
b/libminifi/src/processors/PutFile.cpp
new file mode 100644
index 0000000..85cf09b
--- /dev/null
+++ b/libminifi/src/processors/PutFile.cpp
@@ -0,0 +1,213 @@
+/**
+ * @file PutFile.cpp
+ * PutFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <fstream>
+#include <uuid/uuid.h>
+
+#include "utils/StringUtils.h"
+#include "utils/TimeUtil.h"
+#include "processors/PutFile.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE("replace");
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_IGNORE("ignore");
+const std::string PutFile::CONFLICT_RESOLUTION_STRATEGY_FAIL("fail");
+
+const std::string PutFile::ProcessorName("PutFile");
+
+core::Property PutFile::Directory(
+    "Output Directory", "The output directory to which to put files", ".");
+core::Property PutFile::ConflictResolution(
+    "Conflict Resolution Strategy",
+    "Indicates what should happen when a file with the same name already 
exists in the output directory",
+    CONFLICT_RESOLUTION_STRATEGY_FAIL);
+
+core::Relationship PutFile::Success(
+    "success", "All files are routed to success");
+core::Relationship PutFile::Failure(
+    "failure",
+    "Failed files (conflict, write failure, etc.) are transferred to failure");
+
+void PutFile::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(Directory);
+  properties.insert(ConflictResolution);
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+void PutFile::onTrigger(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+  std::string directory;
+
+  if (!context->getProperty(Directory.getName(), directory)) {
+    logger_->log_error("Directory attribute is missing or invalid");
+    return;
+  }
+
+  std::string conflictResolution;
+
+  if (!context->getProperty(ConflictResolution.getName(), conflictResolution)) 
{
+    logger_->log_error(
+        "Conflict Resolution Strategy attribute is missing or invalid");
+    return;
+  }
+
+  std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->get());
+
+  // Do nothing if there are no incoming files
+  if (!flowFile) {
+    return;
+  }
+
+  std::string filename;
+  flowFile->getKeyedAttribute(FILENAME, filename);
+
+  // Generate a safe (universally-unique) temporary filename on the same 
partition
+  char tmpFileUuidStr[37];
+  uuid_t tmpFileUuid;
+  uuid_generate(tmpFileUuid);
+  uuid_unparse_lower(tmpFileUuid, tmpFileUuidStr);
+  std::stringstream tmpFileSs;
+  tmpFileSs << directory << "/." << filename << "." << tmpFileUuidStr;
+  std::string tmpFile = tmpFileSs.str();
+  logger_->log_info("PutFile using temporary file %s", tmpFile.c_str());
+
+  // Determine dest full file paths
+  std::stringstream destFileSs;
+  destFileSs << directory << "/" << filename;
+  std::string destFile = destFileSs.str();
+
+  logger_->log_info("PutFile writing file %s into directory %s",
+                    filename.c_str(), directory.c_str());
+
+  // If file exists, apply conflict resolution strategy
+  struct stat statResult;
+
+  if (stat(destFile.c_str(), &statResult) == 0) {
+    logger_->log_info(
+        "Destination file %s exists; applying Conflict Resolution Strategy: 
%s",
+        destFile.c_str(), conflictResolution.c_str());
+
+    if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_REPLACE) {
+      putFile(session, flowFile, tmpFile, destFile);
+    } else if (conflictResolution == CONFLICT_RESOLUTION_STRATEGY_IGNORE) {
+      session->transfer(flowFile, Success);
+    } else {
+      session->transfer(flowFile, Failure);
+    }
+  } else {
+    putFile(session, flowFile, tmpFile, destFile);
+  }
+}
+
+bool PutFile::putFile(core::ProcessSession *session,
+                      std::shared_ptr<FlowFileRecord> flowFile, const 
std::string &tmpFile,
+                      const std::string &destFile) {
+
+  ReadCallback cb(tmpFile, destFile);
+  session->read(flowFile, &cb);
+
+  if (cb.commit()) {
+    session->transfer(flowFile, Success);
+    return true;
+  } else {
+    session->transfer(flowFile, Failure);
+  }
+  return false;
+}
+
+PutFile::ReadCallback::ReadCallback(const std::string &tmpFile,
+                                    const std::string &destFile)
+    : _tmpFile(tmpFile),
+      _tmpFileOs(tmpFile),
+      _destFile(destFile) {
+  logger_ = logging::Logger::getLogger();
+}
+
+// Copy the entire file contents to the temporary file
+void PutFile::ReadCallback::process(std::ifstream *stream) {
+  // Copy file contents into tmp file
+  _writeSucceeded = false;
+  _tmpFileOs << stream->rdbuf();
+  _writeSucceeded = true;
+}
+
+// Renames tmp file to final destination
+// Returns true if commit succeeded
+bool PutFile::ReadCallback::commit() {
+  bool success = false;
+
+  logger_->log_info("PutFile committing put file operation to %s",
+                    _destFile.c_str());
+
+  if (_writeSucceeded) {
+    _tmpFileOs.close();
+
+    if (rename(_tmpFile.c_str(), _destFile.c_str())) {
+      logger_->log_info(
+          "PutFile commit put file operation to %s failed because rename() 
call failed",
+          _destFile.c_str());
+    } else {
+      success = true;
+      logger_->log_info("PutFile commit put file operation to %s succeeded",
+                        _destFile.c_str());
+    }
+  } else {
+    logger_->log_error(
+        "PutFile commit put file operation to %s failed because write failed",
+        _destFile.c_str());
+  }
+
+  return success;
+}
+
+// Clean up resources
+PutFile::ReadCallback::~ReadCallback() {
+  // Close tmp file
+  _tmpFileOs.close();
+
+  // Clean up tmp file, if necessary
+  unlink(_tmpFile.c_str());
+}
+
+
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/processors/RealTimeDataCollector.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/RealTimeDataCollector.cpp 
b/libminifi/src/processors/RealTimeDataCollector.cpp
new file mode 100644
index 0000000..922835d
--- /dev/null
+++ b/libminifi/src/processors/RealTimeDataCollector.cpp
@@ -0,0 +1,480 @@
+/**
+ * @file RealTimeDataCollector.cpp
+ * RealTimeDataCollector class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <memory>
+#include <random>
+#include <netinet/tcp.h>
+
+#include "utils/StringUtils.h"
+#include "processors/RealTimeDataCollector.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+const std::string 
RealTimeDataCollector::ProcessorName("RealTimeDataCollector");
+core::Property RealTimeDataCollector::FILENAME(
+    "File Name", "File Name for the real time processor to process",
+    "data.osp");
+core::Property RealTimeDataCollector::REALTIMESERVERNAME(
+    "Real Time Server Name", "Real Time Server Name", "localhost");
+core::Property RealTimeDataCollector::REALTIMESERVERPORT(
+    "Real Time Server Port", "Real Time Server Port", "10000");
+core::Property RealTimeDataCollector::BATCHSERVERNAME(
+    "Batch Server Name", "Batch Server Name", "localhost");
+core::Property RealTimeDataCollector::BATCHSERVERPORT(
+    "Batch Server Port", "Batch Server Port", "10001");
+core::Property RealTimeDataCollector::ITERATION(
+    "Iteration", "If true, sample osp file will be iterated", "true");
+core::Property RealTimeDataCollector::REALTIMEMSGID(
+    "Real Time Message ID", "Real Time Message ID", "41");
+core::Property RealTimeDataCollector::BATCHMSGID(
+    "Batch Message ID", "Batch Message ID", "172, 30, 48");
+core::Property RealTimeDataCollector::REALTIMEINTERVAL(
+    "Real Time Interval", "Real Time Data Collection Interval in msec",
+    "10 ms");
+core::Property RealTimeDataCollector::BATCHINTERVAL(
+    "Batch Time Interval", "Batch Processing Interval in msec", "100 ms");
+core::Property RealTimeDataCollector::BATCHMAXBUFFERSIZE(
+    "Batch Max Buffer Size", "Batch Buffer Maximum size in bytes", "262144");
+core::Relationship RealTimeDataCollector::Success(
+    "success", "success operational on the flow record");
+
+void RealTimeDataCollector::initialize() {
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(FILENAME);
+  properties.insert(REALTIMESERVERNAME);
+  properties.insert(REALTIMESERVERPORT);
+  properties.insert(BATCHSERVERNAME);
+  properties.insert(BATCHSERVERPORT);
+  properties.insert(ITERATION);
+  properties.insert(REALTIMEMSGID);
+  properties.insert(BATCHMSGID);
+  properties.insert(REALTIMEINTERVAL);
+  properties.insert(BATCHINTERVAL);
+  properties.insert(BATCHMAXBUFFERSIZE);
+
+  setSupportedProperties(properties);
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
+
+}
+
+int RealTimeDataCollector::connectServer(const char *host, uint16_t port) {
+  in_addr_t addr;
+  int sock = 0;
+  struct hostent *h;
+#ifdef __MACH__
+  h = gethostbyname(host);
+#else
+  char buf[1024];
+  struct hostent he;
+  int hh_errno;
+  gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+  memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+  sock = socket(AF_INET, SOCK_STREAM, 0);
+  if (sock < 0) {
+    logger_->log_error("Could not create socket to hostName %s", host);
+    return 0;
+  }
+
+#ifndef __MACH__
+  int opt = 1;
+  bool nagle_off = true;
+
+  if (nagle_off)
+  {
+    if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0)
+    {
+      logger_->log_error("setsockopt() TCP_NODELAY failed");
+      close(sock);
+      return 0;
+    }
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+            (char *)&opt, sizeof(opt)) < 0)
+    {
+      logger_->log_error("setsockopt() SO_REUSEADDR failed");
+      close(sock);
+      return 0;
+    }
+  }
+
+  int sndsize = 256*1024;
+  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
+  {
+    logger_->log_error("setsockopt() SO_SNDBUF failed");
+    close(sock);
+    return 0;
+  }
+#endif
+
+  struct sockaddr_in sa;
+  socklen_t socklen;
+  int status;
+
+  //TODO bind socket to the interface
+  memset(&sa, 0, sizeof(sa));
+  sa.sin_family = AF_INET;
+  sa.sin_addr.s_addr = htonl(INADDR_ANY);
+  sa.sin_port = htons(0);
+  socklen = sizeof(sa);
+  if (bind(sock, (struct sockaddr *) &sa, socklen) < 0) {
+    logger_->log_error("socket bind failed");
+    close(sock);
+    return 0;
+  }
+
+  memset(&sa, 0, sizeof(sa));
+  sa.sin_family = AF_INET;
+  sa.sin_addr.s_addr = addr;
+  sa.sin_port = htons(port);
+  socklen = sizeof(sa);
+
+  status = connect(sock, (struct sockaddr *) &sa, socklen);
+
+  if (status < 0) {
+    logger_->log_error("socket connect failed to %s %d", host, port);
+    close(sock);
+    return 0;
+  }
+
+  logger_->log_info("socket %d connect to server %s port %d success", sock,
+                    host, port);
+
+  return sock;
+}
+
+int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen) {
+  int ret = 0, bytes = 0;
+
+  while (bytes < buflen) {
+    ret = send(socket, buf + bytes, buflen - bytes, 0);
+    //check for errors
+    if (ret == -1) {
+      return ret;
+    }
+    bytes += ret;
+  }
+
+  if (ret)
+    logger_->log_debug("Send data size %d over socket %d", buflen, socket);
+
+  return ret;
+}
+
+void RealTimeDataCollector::onTriggerRealTime(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+  if (_realTimeAccumulated >= this->_realTimeInterval) {
+    std::string value;
+    if (this->getProperty(REALTIMEMSGID.getName(), value)) {
+      this->_realTimeMsgID.clear();
+      this->logger_->log_info("Real Time Msg IDs %s", value.c_str());
+      std::stringstream lineStream(value);
+      std::string cell;
+
+      while (std::getline(lineStream, cell, ',')) {
+        this->_realTimeMsgID.push_back(cell);
+        // this->logger_->log_debug("Real Time Msg ID %s", cell.c_str());
+      }
+    }
+    if (this->getProperty(BATCHMSGID.getName(), value)) {
+      this->_batchMsgID.clear();
+      this->logger_->log_info("Batch Msg IDs %s", value.c_str());
+      std::stringstream lineStream(value);
+      std::string cell;
+
+      while (std::getline(lineStream, cell, ',')) {
+        cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell);
+        this->_batchMsgID.push_back(cell);
+      }
+    }
+    // Open the file
+    if (!this->_fileStream.is_open()) {
+      _fileStream.open(this->_fileName.c_str(), std::ifstream::in);
+      if (this->_fileStream.is_open())
+        logger_->log_debug("open %s", _fileName.c_str());
+    }
+    if (!_fileStream.good()) {
+      logger_->log_error("load data file failed %s", _fileName.c_str());
+      return;
+    }
+    if (this->_fileStream.is_open()) {
+      std::string line;
+
+      while (std::getline(_fileStream, line)) {
+        line += "\n";
+        std::stringstream lineStream(line);
+        std::string cell;
+        if (std::getline(lineStream, cell, ',')) {
+          cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell);
+          // Check whether it match to the batch traffic
+          for (std::vector<std::string>::iterator it = _batchMsgID.begin();
+              it != _batchMsgID.end(); ++it) {
+            if (cell == *it) {
+              // push the batch data to the queue
+              std::lock_guard<std::mutex> lock(mutex_);
+              while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) {
+                std::string item = _queue.front();
+                _queuedDataSize -= item.size();
+                logger_->log_debug(
+                    "Pop item size %d from batch queue, queue buffer size %d",
+                    item.size(), _queuedDataSize);
+                _queue.pop();
+              }
+              _queue.push(line);
+              _queuedDataSize += line.size();
+              logger_->log_debug(
+                  "Push batch msg ID %s into batch queue, queue buffer size 
%d",
+                  cell.c_str(), _queuedDataSize);
+            }
+          }
+          bool findRealTime = false;
+          // Check whether it match to the real time traffic
+          for (std::vector<std::string>::iterator it = _realTimeMsgID.begin();
+              it != _realTimeMsgID.end(); ++it) {
+            if (cell == *it) {
+              int status = 0;
+              if (this->_realTimeSocket <= 0) {
+                // Connect the LTE socket
+                uint16_t port = _realTimeServerPort;
+                this->_realTimeSocket = connectServer(
+                    _realTimeServerName.c_str(), port);
+              }
+              if (this->_realTimeSocket) {
+                // try to send the data
+                status = sendData(_realTimeSocket, line.data(), line.size());
+                if (status < 0) {
+                  close(_realTimeSocket);
+                  _realTimeSocket = 0;
+                }
+              }
+              if (this->_realTimeSocket <= 0 || status < 0) {
+                // push the batch data to the queue
+                std::lock_guard<std::mutex> lock(mutex_);
+                while ((_queuedDataSize + line.size()) > _batchMaxBufferSize) {
+                  std::string item = _queue.front();
+                  _queuedDataSize -= item.size();
+                  logger_->log_debug(
+                      "Pop item size %d from batch queue, queue buffer size 
%d",
+                      item.size(), _queuedDataSize);
+                  _queue.pop();
+                }
+                _queue.push(line);
+                _queuedDataSize += line.size();
+                logger_->log_debug(
+                    "Push real time msg ID %s into batch queue, queue buffer 
size %d",
+                    cell.c_str(), _queuedDataSize);
+              }
+              // find real time
+              findRealTime = true;
+            }  // cell
+          }  // for real time pattern
+          if (findRealTime)
+            // we break the while once we find the first real time
+            break;
+        }  // if get line
+      }  // while
+      if (_fileStream.eof()) {
+        _fileStream.close();
+      }
+    }  // if open
+    _realTimeAccumulated = 0;
+  }
+  std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
+      context->getProcessorNode().getProcessor());
+  _realTimeAccumulated += processor->getSchedulingPeriodNano();
+}
+
+void RealTimeDataCollector::onTriggerBatch(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+  if (_batchAcccumulated >= this->_batchInterval) {
+    // logger_->log_info("onTriggerBatch");
+    // dequeue the batch and send over WIFI
+    int status = 0;
+    if (this->_batchSocket <= 0) {
+      // Connect the WIFI socket
+      uint16_t port = _batchServerPort;
+      this->_batchSocket = connectServer(_batchServerName.c_str(), port);
+    }
+    if (this->_batchSocket) {
+      std::lock_guard<std::mutex> lock(mutex_);
+
+      while (!_queue.empty()) {
+        std::string line = _queue.front();
+        status = sendData(_batchSocket, line.data(), line.size());
+        _queue.pop();
+        _queuedDataSize -= line.size();
+        if (status < 0) {
+          close(_batchSocket);
+          _batchSocket = 0;
+          break;
+        }
+      }
+    }
+    _batchAcccumulated = 0;
+  }
+  std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
+      context->getProcessorNode().getProcessor());
+  _batchAcccumulated += processor->getSchedulingPeriodNano();
+}
+
+void RealTimeDataCollector::onTrigger(
+    core::ProcessContext *context,
+    core::ProcessSession *session) {
+  std::thread::id id = std::this_thread::get_id();
+
+  if (id == _realTimeThreadId)
+    return onTriggerRealTime(context, session);
+  else if (id == _batchThreadId)
+    return onTriggerBatch(context, session);
+  else {
+    std::lock_guard<std::mutex> lock(mutex_);
+    if (!this->_firstInvoking) {
+      this->_fileName = "data.osp";
+      std::string value;
+      if (this->getProperty(FILENAME.getName(), value)) {
+        this->_fileName = value;
+        this->logger_->log_info("Data Collector File Name %s",
+                                _fileName.c_str());
+      }
+      this->_realTimeServerName = "localhost";
+      if (this->getProperty(REALTIMESERVERNAME.getName(), value)) {
+        this->_realTimeServerName = value;
+        this->logger_->log_info("Real Time Server Name %s",
+                                this->_realTimeServerName.c_str());
+      }
+      this->_realTimeServerPort = 10000;
+      if (this->getProperty(REALTIMESERVERPORT.getName(), value)) {
+        core::Property::StringToInt(
+            value, _realTimeServerPort);
+        this->logger_->log_info("Real Time Server Port %d",
+                                _realTimeServerPort);
+      }
+      if (this->getProperty(BATCHSERVERNAME.getName(), value)) {
+        this->_batchServerName = value;
+        this->logger_->log_info("Batch Server Name %s",
+                                this->_batchServerName.c_str());
+      }
+      this->_batchServerPort = 10001;
+      if (this->getProperty(BATCHSERVERPORT.getName(), value)) {
+        core::Property::StringToInt(
+            value, _batchServerPort);
+        this->logger_->log_info("Batch Server Port %d", _batchServerPort);
+      }
+      if (this->getProperty(ITERATION.getName(), value)) {
+        org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, 
this->_iteration);
+        logger_->log_info("Iteration %d", _iteration);
+      }
+      this->_realTimeInterval = 10000000;  //10 msec
+      if (this->getProperty(REALTIMEINTERVAL.getName(), value)) {
+        core::TimeUnit unit;
+        if (core::Property::StringToTime(
+            value, _realTimeInterval, unit)
+            && core::Property::ConvertTimeUnitToNS(
+                _realTimeInterval, unit, _realTimeInterval)) {
+          logger_->log_info("Real Time Interval: [%d] ns", _realTimeInterval);
+        }
+      }
+      this->_batchInterval = 100000000;  //100 msec
+      if (this->getProperty(BATCHINTERVAL.getName(), value)) {
+        core::TimeUnit unit;
+        if (core::Property::StringToTime(
+            value, _batchInterval, unit)
+            && core::Property::ConvertTimeUnitToNS(
+                _batchInterval, unit, _batchInterval)) {
+          logger_->log_info("Batch Time Interval: [%d] ns", _batchInterval);
+        }
+      }
+      this->_batchMaxBufferSize = 256 * 1024;
+      if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), value)) {
+        core::Property::StringToInt(
+            value, _batchMaxBufferSize);
+        this->logger_->log_info("Batch Max Buffer Size %d",
+                                _batchMaxBufferSize);
+      }
+      if (this->getProperty(REALTIMEMSGID.getName(), value)) {
+        this->logger_->log_info("Real Time Msg IDs %s", value.c_str());
+        std::stringstream lineStream(value);
+        std::string cell;
+
+        while (std::getline(lineStream, cell, ',')) {
+          this->_realTimeMsgID.push_back(cell);
+          this->logger_->log_info("Real Time Msg ID %s", cell.c_str());
+        }
+      }
+      if (this->getProperty(BATCHMSGID.getName(), value)) {
+        this->logger_->log_info("Batch Msg IDs %s", value.c_str());
+        std::stringstream lineStream(value);
+        std::string cell;
+
+        while (std::getline(lineStream, cell, ',')) {
+          cell = org::apache::nifi::minifi::utils::StringUtils::trim(cell);
+          this->_batchMsgID.push_back(cell);
+          this->logger_->log_info("Batch Msg ID %s", cell.c_str());
+        }
+      }
+      // Connect the LTE socket
+      uint16_t port = _realTimeServerPort;
+
+      this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
+
+      // Connect the WIFI socket
+      port = _batchServerPort;
+
+      this->_batchSocket = connectServer(_batchServerName.c_str(), port);
+
+      // Open the file
+      _fileStream.open(this->_fileName.c_str(), std::ifstream::in);
+      if (!_fileStream.good()) {
+        logger_->log_error("load data file failed %s", _fileName.c_str());
+        return;
+      } else {
+        logger_->log_debug("open %s", _fileName.c_str());
+      }
+      _realTimeThreadId = id;
+      this->_firstInvoking = true;
+    } else {
+      if (id != _realTimeThreadId)
+        _batchThreadId = id;
+      this->_firstInvoking = false;
+    }
+  }
+}
+} /* namespace processors */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

Reply via email to