http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Property.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Property.h 
b/libminifi/include/core/Property.h
new file mode 100644
index 0000000..c681449
--- /dev/null
+++ b/libminifi/include/core/Property.h
@@ -0,0 +1,264 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROPERTY_H__
+#define __PROPERTY_H__
+
+#include <algorithm>
+#include <sstream>
+#include <string>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <functional>
+#include <set>
+#include <stdlib.h>
+#include <math.h>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// Time Unit
+enum TimeUnit {
+  DAY,
+  HOUR,
+  MINUTE,
+  SECOND,
+  MILLISECOND,
+  NANOSECOND
+};
+
+// Property Class
+class Property {
+
+ public:
+  // Constructor
+  /*!
+   * Create a new property
+   */
+  Property(const std::string name, const std::string description,
+           const std::string value)
+      : name_(name),
+        description_(description),
+        value_(value) {
+  }
+  Property() {
+  }
+  // Destructor
+  virtual ~Property() {
+  }
+  // Get Name for the property
+  std::string getName() const;
+  // Get Description for the property
+  std::string getDescription();
+  // Get value for the property
+  std::string getValue() const;
+  // Set value for the property
+  void setValue(std::string value);
+  const Property &operator=(const Property &other);
+  // Compare
+  bool operator <(const Property & right) const;
+
+  // Convert TimeUnit to MilliSecond
+  static bool ConvertTimeUnitToMS(int64_t input, TimeUnit unit, int64_t &out) {
+    if (unit == MILLISECOND) {
+      out = input;
+      return true;
+    } else if (unit == SECOND) {
+      out = input * 1000;
+      return true;
+    } else if (unit == MINUTE) {
+      out = input * 60 * 1000;
+      return true;
+    } else if (unit == HOUR) {
+      out = input * 60 * 60 * 1000;
+      return true;
+    } else if (unit == DAY) {
+      out = 24 * 60 * 60 * 1000;
+      return true;
+    } else if (unit == NANOSECOND) {
+      out = input / 1000 / 1000;
+      return true;
+    } else {
+      return false;
+    }
+  }
+  // Convert TimeUnit to NanoSecond
+  static bool ConvertTimeUnitToNS(int64_t input, TimeUnit unit, int64_t &out) {
+    if (unit == MILLISECOND) {
+      out = input * 1000 * 1000;
+      return true;
+    } else if (unit == SECOND) {
+      out = input * 1000 * 1000 * 1000;
+      return true;
+    } else if (unit == MINUTE) {
+      out = input * 60 * 1000 * 1000 * 1000;
+      return true;
+    } else if (unit == HOUR) {
+      out = input * 60 * 60 * 1000 * 1000 * 1000;
+      return true;
+    } else if (unit == NANOSECOND) {
+      out = input;
+      return true;
+    } else {
+      return false;
+    }
+  }
+  // Convert String
+  static bool StringToTime(std::string input, int64_t &output,
+                           TimeUnit &timeunit) {
+    if (input.size() == 0) {
+      return false;
+    }
+
+    const char *cvalue = input.c_str();
+    char *pEnd;
+    long int ival = strtol(cvalue, &pEnd, 0);
+
+    if (pEnd[0] == '\0') {
+      return false;
+    }
+
+    while (*pEnd == ' ') {
+      // Skip the space
+      pEnd++;
+    }
+
+    std::string unit(pEnd);
+
+    if (unit == "sec" || unit == "s" || unit == "second" || unit == "seconds"
+        || unit == "secs") {
+      timeunit = SECOND;
+      output = ival;
+      return true;
+    } else if (unit == "min" || unit == "m" || unit == "mins"
+        || unit == "minute" || unit == "minutes") {
+      timeunit = MINUTE;
+      output = ival;
+      return true;
+    } else if (unit == "ns" || unit == "nano" || unit == "nanos"
+        || unit == "nanoseconds") {
+      timeunit = NANOSECOND;
+      output = ival;
+      return true;
+    } else if (unit == "ms" || unit == "milli" || unit == "millis"
+        || unit == "milliseconds") {
+      timeunit = MILLISECOND;
+      output = ival;
+      return true;
+    } else if (unit == "h" || unit == "hr" || unit == "hour" || unit == "hrs"
+        || unit == "hours") {
+      timeunit = HOUR;
+      output = ival;
+      return true;
+    } else if (unit == "d" || unit == "day" || unit == "days") {
+      timeunit = DAY;
+      output = ival;
+      return true;
+    } else
+      return false;
+  }
+
+  // Convert String to Integer
+  static bool StringToInt(std::string input, int64_t &output) {
+    if (input.size() == 0) {
+      return false;
+    }
+
+    const char *cvalue = input.c_str();
+    char *pEnd;
+    long int ival = strtol(cvalue, &pEnd, 0);
+
+    if (pEnd[0] == '\0') {
+      output = ival;
+      return true;
+    }
+
+    while (*pEnd == ' ') {
+      // Skip the space
+      pEnd++;
+    }
+
+    char end0 = toupper(pEnd[0]);
+    if ((end0 == 'K') || (end0 == 'M') || (end0 == 'G') || (end0 == 'T')
+        || (end0 == 'P')) {
+      if (pEnd[1] == '\0') {
+        unsigned long int multiplier = 1000;
+
+        if ((end0 != 'K')) {
+          multiplier *= 1000;
+          if (end0 != 'M') {
+            multiplier *= 1000;
+            if (end0 != 'G') {
+              multiplier *= 1000;
+              if (end0 != 'T') {
+                multiplier *= 1000;
+              }
+            }
+          }
+        }
+        output = ival * multiplier;
+        return true;
+
+      } else if ((pEnd[1] == 'b' || pEnd[1] == 'B') && (pEnd[2] == '\0')) {
+
+        unsigned long int multiplier = 1024;
+
+        if ((end0 != 'K')) {
+          multiplier *= 1024;
+          if (end0 != 'M') {
+            multiplier *= 1024;
+            if (end0 != 'G') {
+              multiplier *= 1024;
+              if (end0 != 'T') {
+                multiplier *= 1024;
+              }
+            }
+          }
+        }
+        output = ival * multiplier;
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+ protected:
+  // Name
+  std::string name_;
+  // Description
+  std::string description_;
+  // Value
+  std::string value_;
+
+ private:
+
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Relationship.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Relationship.h 
b/libminifi/include/core/Relationship.h
new file mode 100644
index 0000000..416ede6
--- /dev/null
+++ b/libminifi/include/core/Relationship.h
@@ -0,0 +1,96 @@
+/**
+ * @file Relationship.h
+ * Relationship class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RELATIONSHIP_H__
+#define __RELATIONSHIP_H__
+
+#include <string>
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+// undefined relationship for remote process group outgoing port and root 
process group incoming port
+#define UNDEFINED_RELATIONSHIP "undefined"
+
+inline bool isRelationshipNameUndefined(std::string name) {
+  if (name == UNDEFINED_RELATIONSHIP)
+    return true;
+  else
+    return false;
+}
+
+// Relationship Class
+class Relationship {
+
+ public:
+  /*
+   * Create a new relationship 
+   */
+  Relationship(const std::string name, const std::string description)
+      : name_(name),
+        description_(description) {
+  }
+  Relationship()
+      : name_(UNDEFINED_RELATIONSHIP) {
+  }
+  // Destructor
+  virtual ~Relationship() {
+  }
+  // Get Name for the relationship
+  std::string getName() const {
+    return name_;
+  }
+  // Get Description for the relationship
+  std::string getDescription() const {
+    return description_;
+  }
+  // Compare
+  bool operator <(const Relationship & right) const {
+    return name_ < right.name_;
+  }
+  // Whether it is a undefined relationship
+  bool isRelationshipUndefined() {
+    return isRelationshipNameUndefined(name_);
+  }
+
+ protected:
+
+  // Name
+  std::string name_;
+  // Description
+  std::string description_;
+
+ private:
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Repository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Repository.h 
b/libminifi/include/core/Repository.h
new file mode 100644
index 0000000..a668df5
--- /dev/null
+++ b/libminifi/include/core/Repository.h
@@ -0,0 +1,153 @@
+/**
+ * @file Repository 
+ * Repository class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REPOSITORY_H__
+#define __REPOSITORY_H__
+
+#include <ftw.h>
+#include <uuid/uuid.h>
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <map>
+#include <set>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/Property.h"
+#include "ResourceClaim.h"
+#include "io/Serializable.h"
+#include "utils/TimeUtil.h"
+#include "utils/StringUtils.h"
+#include "core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+class Repository : public CoreComponent {
+ public:
+  /*
+   * Constructor for the repository
+   */
+  Repository(std::string repo_name, std::string directory,
+             int64_t maxPartitionMillis, int64_t maxPartitionBytes,
+             uint64_t purgePeriod)
+      : CoreComponent(repo_name),
+        thread_() {
+    directory_ = directory;
+    max_partition_millis_ = maxPartitionMillis;
+    max_partition_bytes_ = maxPartitionBytes;
+    purge_period_ = purgePeriod;
+    configure_ = Configure::getConfigure();
+    running_ = false;
+    repo_full_ = false;
+  }
+
+  // Destructor
+  virtual ~Repository() {
+    stop();
+  }
+
+  // initialize
+  virtual bool initialize(){
+    return true;
+  }
+  // Put
+  virtual bool Put(std::string key, uint8_t *buf, int bufLen){
+    return true;
+  }
+  // Delete
+  virtual bool Delete(std::string key){
+    return true;
+  }
+
+  virtual bool Get(std::string key, std::string &value) {
+    return true;
+  }
+
+  // Run function for the thread
+  virtual  void run(){
+    // no op
+  }
+  // Start the repository monitor thread
+  virtual void start();
+  // Stop the repository monitor thread
+  virtual void stop();
+  // whether the repo is full
+  virtual bool isFull() {
+    return repo_full_;
+  }
+  // whether the repo is enable
+  virtual bool isRunning() {
+    return running_;
+  }
+  uint64_t incrementSize(const char *fpath, const struct stat *sb,
+                         int typeflag) {
+    return (repo_size_ += sb->st_size);
+  }
+
+  // Prevent default copy constructor and assignment operation
+  // Only support pass by reference or pointer
+  Repository(const Repository &parent) = delete;
+  Repository &operator=(const Repository &parent) = delete;
+
+ protected:
+  // Mutex for protection
+  std::mutex mutex_;
+  // repository directory
+  std::string directory_;
+  // Configure
+  Configure *configure_;
+  // max db entry life time
+  int64_t max_partition_millis_;
+  // max db size
+  int64_t max_partition_bytes_;
+  // purge period
+  uint64_t purge_period_;
+  // thread
+  std::thread thread_;
+  // whether the monitoring thread is running for the repo while it was enabled
+  bool running_;
+  // whether stop accepting provenace event
+  std::atomic<bool> repo_full_;
+  // repoSize
+  uint64_t repoSize();
+  // size of the directory
+  std::atomic<uint64_t> repo_size_;
+
+ private:
+  // Run function for the thread
+    void threadExecutor(){
+      run();
+    }
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/RepositoryFactory.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/RepositoryFactory.h 
b/libminifi/include/core/RepositoryFactory.h
new file mode 100644
index 0000000..03ed524
--- /dev/null
+++ b/libminifi/include/core/RepositoryFactory.h
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_
+
+
+#include "core/Repository.h"
+#include "core.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+namespace core {
+
+  std::shared_ptr<core::Repository> createRepository(
+      const std::string configuration_class_name, bool fail_safe = false);
+
+
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORYFACTORY_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/Scheduling.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Scheduling.h 
b/libminifi/include/core/Scheduling.h
new file mode 100644
index 0000000..0c983df
--- /dev/null
+++ b/libminifi/include/core/Scheduling.h
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_SCHEDULING_H_
+#define LIBMINIFI_INCLUDE_CORE_SCHEDULING_H_
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+/*
+ * Indicates the valid values for the state of a entity
+ * with respect to scheduling the entity to run.
+ */
+enum ScheduledState {
+
+  /**
+   * Entity cannot be scheduled to run
+   */
+  DISABLED,
+  /**
+   * Entity can be scheduled to run but currently is not
+   */
+  STOPPED,
+  /**
+   * Entity is currently scheduled to run
+   */
+  RUNNING
+};
+
+/*
+ * Scheduling Strategy
+ */
+enum SchedulingStrategy {
+  // Event driven
+  EVENT_DRIVEN,
+  // Timer driven
+  TIMER_DRIVEN,
+  // Cron Driven
+  CRON_DRIVEN
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_CORE_SCHEDULING_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/core.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/core.h b/libminifi/include/core/core.h
new file mode 100644
index 0000000..9f86100
--- /dev/null
+++ b/libminifi/include/core/core.h
@@ -0,0 +1,177 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_CORE_H_
+#define LIBMINIFI_INCLUDE_CORE_CORE_H_
+
+#include <uuid/uuid.h>
+#include <cxxabi.h>
+#include "core/logging/Logger.h"
+/**
+ * namespace aliasing
+ */
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+}
+namespace processors {
+}
+namespace provenance {
+
+}
+namespace core {
+
+template<typename T>
+static inline std::string getClassName() {
+  char *b =   abi::__cxa_demangle(typeid(T).name(), 0, 0, 0);
+  std::string name = b;
+  delete [] b;
+  return name;
+}
+
+template<typename T>
+struct class_operations {
+  
+  template<typename Q=T>
+  static std::true_type canDestruct(decltype(std::declval<Q>().~Q()) *) {
+    return std::true_type();
+  }
+
+  
+  template<typename Q=T>
+  static std::false_type canDestruct(...) {
+    return std::false_type();
+  }
+
+  typedef decltype(canDestruct<T>(0)) type;
+
+  static const bool value = type::value; /* Which is it? */
+};
+
+
+template<typename T>
+typename std::enable_if<!class_operations<T>::value, T*>::type instantiate() {
+  throw std::runtime_error("Cannot instantiate class");
+}
+
+template<typename T>
+typename std::enable_if<class_operations<T>::value, T*>::type instantiate() {
+  return new T();
+}
+
+/**
+ * Base component within MiNiFi
+ * Purpose: Many objects store a name and UUID, therefore
+ * the functionality is localized here to avoid duplication
+ */
+class CoreComponent {
+
+ public:
+
+  /**
+   * Constructor that sets the name and uuid.
+   */
+  explicit CoreComponent(const std::string name, uuid_t uuid = 0)
+      : logger_(logging::Logger::getLogger()),
+        name_(name) {
+    if (!uuid)
+      // Generate the global UUID for the flow record
+      uuid_generate(uuid_);
+    else
+      uuid_copy(uuid_, uuid);
+
+    char uuidStr[37];
+    uuid_unparse_lower(uuid_, uuidStr);
+    uuidStr_ = uuidStr;
+  }
+
+  /**
+   * Move Constructor.
+   */
+  explicit CoreComponent(const CoreComponent &&other)
+      : name_(std::move(other.name_)),
+        logger_(logging::Logger::getLogger()) {
+    uuid_copy(uuid_, other.uuid_);
+  }
+
+  // Get component name Name
+  std::string getName();
+
+  /**
+   * Set name.
+   * @param name
+   */
+  void setName(const std::string name);
+
+  /**
+   * Set UUID in this instance
+   * @param uuid uuid to apply to the internal representation.
+   */
+  void setUUID(uuid_t uuid);
+
+  /**
+   * Returns the UUID through the provided object.
+   * @param uuid uuid struct to which we will copy the memory
+   * @return success of request
+   */
+  bool getUUID(uuid_t uuid);
+
+  unsigned const char *getUUID();
+  /**
+   * Return the UUID string
+   * @param constant reference to the UUID str
+   */
+  const std::string & getUUIDStr()  {
+    return uuidStr_;
+  }
+
+ protected:
+  // A global unique identifier
+  uuid_t uuid_;
+  // UUID string
+  std::string uuidStr_;
+
+  // logger shared ptr
+  std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_;
+
+  // Connectable's name
+  std::string name_;
+};
+
+namespace logging {
+}
+}
+}
+}
+}
+}
+
+namespace minifi = org::apache::nifi::minifi;
+
+namespace core = org::apache::nifi::minifi::core;
+
+namespace processors = org::apache::nifi::minifi::processors;
+
+namespace logging = org::apache::nifi::minifi::core::logging;
+
+namespace utils = org::apache::nifi::minifi::utils;
+
+namespace provenance = org::apache::nifi::minifi::provenance;
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CORE_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/logging/BaseLogger.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/logging/BaseLogger.h 
b/libminifi/include/core/logging/BaseLogger.h
new file mode 100644
index 0000000..bfdf26f
--- /dev/null
+++ b/libminifi/include/core/logging/BaseLogger.h
@@ -0,0 +1,224 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_BASELOGGER_H_
+#define LIBMINIFI_INCLUDE_BASELOGGER_H_
+
+#include <string>
+#include <memory>
+#include "spdlog/spdlog.h"
+#include <iostream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+
+// 5M default log file size
+#define DEFAULT_LOG_FILE_SIZE (5*1024*1024)
+// 3 log files rotation
+#define DEFAULT_LOG_FILE_NUMBER 3
+#define LOG_NAME "minifi log"
+#define LOG_FILE_NAME "minifi-app.log"
+
+/**
+ * Log level enumeration.
+ */
+typedef enum {
+  trace = 0,
+  debug = 1,
+  info = 2,
+  warn = 3,
+  err = 4,
+  critical = 5,
+  off = 6
+} LOG_LEVEL_E;
+
+#define LOG_BUFFER_SIZE 1024
+#define FILL_BUFFER  char buffer[LOG_BUFFER_SIZE]; \
+    va_list args; \
+    va_start(args, format); \
+    std::vsnprintf(buffer, LOG_BUFFER_SIZE,format, args); \
+    va_end(args);
+
+/**
+ * Base class that represents a logger configuration.
+ */
+class BaseLogger {
+
+ public:
+  static const char *nifi_log_level;
+  static const char *nifi_log_appender;
+
+  /**
+   * Base Constructor
+   */
+  BaseLogger() {
+    setLogLevel("info");
+    logger_ = nullptr;
+    stderr_ = nullptr;
+  }
+
+  /**
+   * Logger configuration constructorthat will set the base log level.
+   * @param config incoming configuration.
+   */
+  BaseLogger(std::string log_level, std::shared_ptr<spdlog::logger> logger)
+      : logger_(logger) {
+    setLogLevel(log_level);
+
+  }
+
+  virtual ~BaseLogger() {
+
+  }
+
+  /**
+   * Move constructor that will atomically swap configuration
+   * shared pointers.
+   */
+  BaseLogger(const BaseLogger &&other)
+      : configured_level_(other.configured_level_.load()) {
+    // must atomically exchange the pointers
+    logger_ = std::move(other.logger_);
+    set_error_logger(other.stderr_);
+
+  }
+
+  /**
+   * Returns the log level for this instance.
+   */
+  virtual LOG_LEVEL_E getLogLevel() const {
+    return configured_level_;
+  }
+
+  /**
+   * @brief Log error message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  virtual void log_error(const char * const format, ...);
+  /**
+   * @brief Log warn message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  virtual void log_warn(const char * const format, ...);
+  /**
+   * @brief Log info message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  virtual void log_info(const char * const format, ...);
+  /**
+   * @brief Log debug message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  virtual void log_debug(const char * const format, ...);
+  /**
+   * @brief Log trace message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  virtual void log_trace(const char * const format, ...);
+
+  /**
+   * @brief Log error message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  virtual void log_str(LOG_LEVEL_E level, const std::string &buffer);
+
+  /**
+   * Sets the log level for this instance based on the string
+   * @param level desired log leve.
+   * @param defaultLevel default level if we cannot match level.
+   */
+  virtual void setLogLevel(const std::string &level, LOG_LEVEL_E defaultLevel =
+                               info);
+
+  /**
+   * Sets the log level atomic and sets it
+   * within logger if it can
+   * @param level desired log level.
+   */
+  virtual void setLogLevel(LOG_LEVEL_E level) {
+    configured_level_ = level;
+    setLogLevel();
+  }
+
+  bool shouldLog(LOG_LEVEL_E level) {
+    return level >= configured_level_.load(std::memory_order_relaxed);
+  }
+
+  /**
+   * Move operator overload
+   */
+  BaseLogger &operator=(const BaseLogger &&other) {
+    configured_level_ = (other.configured_level_.load());
+    // must atomically exchange the pointers
+    logger_ = std::move(other.logger_);
+    set_error_logger(other.stderr_);
+    return *this;
+  }
+
+ protected:
+
+  /**
+   * Logger configuration constructorthat will set the base log level.
+   * @param config incoming configuration.
+   */
+  BaseLogger(std::string log_level)
+      : logger_(nullptr) {
+    setLogLevel(log_level);
+  }
+
+  void setLogger(std::shared_ptr<spdlog::logger> logger) {
+    logger_ = logger;
+  }
+
+  /**
+   * Since a thread may be using stderr and it can be null,
+   * we must atomically exchange the shared pointers.
+   * @param other other shared pointer. can be null ptr
+   */
+  void set_error_logger(std::shared_ptr<spdlog::logger> other);
+
+  /**
+   * Sets the log level on the spdlogger if it is not null.
+   */
+  void setLogLevel() {
+    if (logger_ != nullptr)
+      logger_->set_level((spdlog::level::level_enum) configured_level_.load());
+
+  }
+
+  std::atomic<LOG_LEVEL_E> configured_level_;
+  std::shared_ptr<spdlog::logger> logger_;
+  std::shared_ptr<spdlog::logger> stderr_;
+};
+
+} /* namespace logging */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_BASELOGGER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/logging/LogAppenders.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/logging/LogAppenders.h 
b/libminifi/include/core/logging/LogAppenders.h
new file mode 100644
index 0000000..7bdc3be
--- /dev/null
+++ b/libminifi/include/core/logging/LogAppenders.h
@@ -0,0 +1,301 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_LOGAPPENDERS_H_
+#define LIBMINIFI_INCLUDE_LOGAPPENDERS_H_
+
+#include "BaseLogger.h"
+#include "spdlog/sinks/null_sink.h"
+#include "spdlog/sinks/ostream_sink.h"
+#include <cxxabi.h>
+#include "properties/Configure.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+
+template<typename T>
+static std::string getUniqueName() {
+  std::string name = LOG_NAME;
+  name += " -- ";
+  name += abi::__cxa_demangle(typeid(T).name(), 0, 0, 0);
+  spdlog::drop(name);
+  return name;
+}
+
+/**
+ * Null appender sets a null sink, thereby performing no logging.
+ */
+class NullAppender : public BaseLogger {
+ public:
+  /**
+   * Base constructor that creates the null sink.
+   */
+  explicit NullAppender()
+      : BaseLogger("off") {
+    auto null_sink = std::make_shared<spdlog::sinks::null_sink_st>();
+    std::string unique_name = getUniqueName<NullAppender>();
+    logger_ = std::make_shared<spdlog::logger>(unique_name, null_sink);
+    configured_level_ = off;
+    setLogLevel();
+  }
+
+  /**
+   * Move constructor for the null appender.
+   */
+  explicit NullAppender(const NullAppender &&other)
+      : BaseLogger(std::move(other)) {
+
+  }
+
+};
+
+/**
+ * Basic output stream configuration that uses a supplied ostream
+ *
+ * Design : extends LoggerConfiguration using the logger and log level
+ * encapsulated within the base configuration class.
+ */
+class OutputStreamAppender : public BaseLogger {
+
+ public:
+
+  static const char *nifi_log_output_stream_error_stderr;
+
+  /**
+   * Output stream move constructor.
+   */
+  explicit OutputStreamAppender(const OutputStreamAppender &&other)
+      : BaseLogger(std::move(other)) {
+
+  }
+
+  /**
+   * Base constructor. Creates a ostream sink.
+   * @param stream incoming stream reference.
+   * @param config configuration.
+   */
+  explicit OutputStreamAppender(Configure *config)
+      : BaseLogger("info") {
+    auto ostream_sink = std::make_shared<spdlog::sinks::ostream_sink_mt>(
+        std::cout);
+
+    std::string unique_name = getUniqueName<OutputStreamAppender>();
+    logger_ = std::make_shared<spdlog::logger>(unique_name, ostream_sink);
+
+    std::string use_std_err;
+
+    if (NULL != config
+        && config->get(nifi_log_output_stream_error_stderr, use_std_err)) {
+
+      std::transform(use_std_err.begin(), use_std_err.end(),
+                     use_std_err.begin(), ::tolower);
+
+      if (use_std_err == "true") {
+        std::string err_unique_name = getUniqueName<OutputStreamAppender>();
+        auto error_ostream_sink = std::make_shared<
+            spdlog::sinks::ostream_sink_mt>(std::cerr);
+        stderr_ = std::make_shared<spdlog::logger>(err_unique_name,
+                                                   error_ostream_sink);
+      }
+    } else {
+      stderr_ = nullptr;
+    }
+
+    std::string log_level;
+    if (NULL != config && config->get(BaseLogger::nifi_log_level, log_level)) {
+      setLogLevel(log_level);
+    } else {
+      setLogLevel("info");
+    }
+
+  }
+
+  /**
+   * Base constructor. Creates a ostream sink.
+   * @param stream incoming stream reference.
+   * @param config configuration.
+   */
+  OutputStreamAppender(std::ostream &stream, Configure *config)
+      : BaseLogger("info") {
+    auto ostream_sink = std::make_shared<spdlog::sinks::ostream_sink_mt>(
+        stream);
+    std::string unique_name = getUniqueName<OutputStreamAppender>();
+    logger_ = std::make_shared<spdlog::logger>(unique_name, ostream_sink);
+
+    stderr_ = nullptr;
+
+    std::string log_level;
+    if (NULL != config && config->get(BaseLogger::nifi_log_level, log_level)) {
+      setLogLevel(log_level);
+    } else {
+      setLogLevel("info");
+    }
+
+  }
+
+ protected:
+
+};
+
+/**
+ * Rolling configuration
+ * Design : extends LoggerConfiguration using the logger and log level
+ * encapsulated within the base configuration class.
+ */
+class RollingAppender : public BaseLogger {
+ public:
+  static const char *nifi_log_rolling_apender_file;
+  static const char *nifi_log_rolling_appender_max_files;
+  static const char *nifi_log_rolling_appender_max_file_size;
+
+  /**
+   * RollingAppenderConfiguration move constructor.
+   */
+  explicit RollingAppender(const RollingAppender&& other)
+      : BaseLogger(std::move(other)),
+        max_files_(std::move(other.max_files_)),
+        file_name_(std::move(other.file_name_)),
+        max_file_size_(std::move(other.max_file_size_)) {
+  }
+  /**
+   * Base Constructor.
+   * @param config pointer to the configuration for this instance.
+   */
+  explicit RollingAppender(Configure * config = 0)
+      : BaseLogger("info") {
+    std::string file_name = "";
+    if (NULL != config
+        && config->get(nifi_log_rolling_apender_file, file_name)) {
+      file_name_ = file_name;
+    } else {
+      file_name_ = LOG_FILE_NAME;
+    }
+
+    std::string max_files = "";
+    if (NULL != config
+        && config->get(nifi_log_rolling_appender_max_files, max_files)) {
+      try {
+        max_files_ = std::stoi(max_files);
+      } catch (const std::invalid_argument &ia) {
+        max_files_ = DEFAULT_LOG_FILE_NUMBER;
+      } catch (const std::out_of_range &oor) {
+        max_files_ = DEFAULT_LOG_FILE_NUMBER;
+      }
+    } else {
+      max_files_ = DEFAULT_LOG_FILE_NUMBER;
+    }
+
+    std::string max_file_size = "";
+    if (NULL != config
+        && config->get(nifi_log_rolling_appender_max_file_size,
+                       max_file_size)) {
+      try {
+        max_file_size_ = std::stoi(max_file_size);
+      } catch (const std::invalid_argument &ia) {
+        max_file_size_ = DEFAULT_LOG_FILE_SIZE;
+      } catch (const std::out_of_range &oor) {
+        max_file_size_ = DEFAULT_LOG_FILE_SIZE;
+      }
+    } else {
+      max_file_size_ = DEFAULT_LOG_FILE_SIZE;
+    }
+
+    std::string unique_name = getUniqueName<OutputStreamAppender>();
+    logger_ = spdlog::rotating_logger_mt(unique_name, file_name_,
+                                         max_file_size_, max_files_);
+
+    std::string log_level;
+    if (NULL != config && config->get(BaseLogger::nifi_log_level, log_level)) {
+      setLogLevel(log_level);
+    }
+  }
+
+  /**
+   * To maintain current functionality we will flush on write.
+   */
+  void log_str(LOG_LEVEL_E level, const std::string &buffer) {
+    BaseLogger::log_str(level, buffer);
+    logger_->flush();
+  }
+
+ protected:
+
+  /**
+   * file name.
+   */
+  std::string file_name_;
+  /**
+   * maximum number of files to keep in the rotation.
+   */
+  size_t max_files_;
+  /**
+   * Maximum file size per rotated file.
+   */
+  size_t max_file_size_;
+
+};
+
+class LogInstance {
+ public:
+  /**
+   * Returns a logger configuration based on
+   * the configuration within this instance.
+   * @param config configuration for this instance.
+   */
+  static std::unique_ptr<BaseLogger> getConfiguredLogger(Configure *config) {
+    std::string appender = "";
+
+    if (config->get(BaseLogger::nifi_log_appender, appender)) {
+      std::transform(appender.begin(), appender.end(), appender.begin(),
+                     ::tolower);
+
+      if ("nullappender" == appender || "null appender" == appender
+          || "null" == appender) {
+
+        return std::move(std::unique_ptr<BaseLogger>(new NullAppender()));
+
+      } else if ("rollingappender" == appender || "rolling appender" == 
appender
+          || "rolling" == appender) {
+
+        return std::move(
+            std::unique_ptr<BaseLogger>(new RollingAppender(config)));
+
+      } else if ("outputstream" == appender
+          || "outputstreamappender" == appender
+          || "outputstream appender" == appender) {
+
+        return std::move(
+            std::unique_ptr<BaseLogger>(new OutputStreamAppender(config)));
+
+      }
+    }
+    return nullptr;
+
+  }
+};
+
+} /* namespace logging */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+#endif /* LIBMINIFI_INCLUDE_LOGAPPENDERS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/logging/Logger.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/logging/Logger.h 
b/libminifi/include/core/logging/Logger.h
new file mode 100644
index 0000000..08ef702
--- /dev/null
+++ b/libminifi/include/core/logging/Logger.h
@@ -0,0 +1,214 @@
+/**
+ * @file Logger.h
+ * Logger class declaration
+ * This is a C++ wrapper for spdlog, a lightweight C++ logging library
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LOGGER_H__
+#define __LOGGER_H__
+
+#include <string>
+#include <atomic>
+#include <memory>
+#include <utility>
+#include <algorithm>
+#include <cstdio>
+#include <iostream>
+
+#include "BaseLogger.h"
+#include "spdlog/spdlog.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace logging {
+
+/**
+ * Logger class
+ * Design: Extends BaseLogger, leaving this class to be the facade to the 
underlying
+ * logging mechanism. Is a facade to BaseLogger's underlying log stream. This 
allows
+ * the underlying implementation to be replaced real time.
+ */
+class Logger : public BaseLogger {
+ protected:
+  struct singleton;
+ public:
+
+  /**
+   * Returns a shared pointer to the logger instance.
+   * Note that while there is no synchronization this is expected
+   * to be called and initialized first
+   * @returns shared pointer to the base logger.
+   */
+  static std::shared_ptr<Logger> getLogger() {
+
+    if (singleton_logger_ == nullptr)
+      singleton_logger_ = std::make_shared<Logger>(singleton { 0 });
+    return singleton_logger_;
+  }
+
+  /**
+   * Returns the log level for this instance.
+   */
+  LOG_LEVEL_E getLogLevel() const {
+    return current_logger_.load()->getLogLevel();
+  }
+
+  /**
+   * Sets the log level atomic and sets it
+   * within logger if it can
+   * @param level desired log level.
+   */
+  void setLogLevel(LOG_LEVEL_E level) {
+    current_logger_.load()->setLogLevel(level);
+  }
+
+  /**
+   * Sets the log level for this instance based on the string
+   * @param level desired log leve.
+   * @param defaultLevel default level if we cannot match level.
+   */
+  void setLogLevel(const std::string &level, LOG_LEVEL_E defaultLevel = info) {
+    current_logger_.load()->setLogLevel(level, info);
+  }
+
+  void updateLogger(std::unique_ptr<BaseLogger> logger) {
+
+    if (logger == nullptr)
+      return;
+    current_logger_.store(logger.release());
+  }
+
+  /**
+   * @brief Log error message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  void log_error(const char * const format, ...) {
+    if (!current_logger_.load()->shouldLog(err))
+      return;
+    FILL_BUFFER
+    current_logger_.load()->log_str(err, buffer);
+  }
+  /**
+   * @brief Log warn message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  void log_warn(const char * const format, ...) {
+    if (!current_logger_.load()->shouldLog(warn))
+      return;
+    FILL_BUFFER
+    current_logger_.load()->log_str(warn, buffer);
+  }
+  /**
+   * @brief Log info message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  void log_info(const char * const format, ...) {
+    if (!current_logger_.load()->shouldLog(info))
+      return;
+    FILL_BUFFER
+    current_logger_.load()->log_str(info, buffer);
+  }
+  /**
+   * @brief Log debug message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  void log_debug(const char * const format, ...) {
+
+    if (!current_logger_.load()->shouldLog(debug))
+      return;
+    FILL_BUFFER
+    current_logger_.load()->log_str(debug, buffer);
+  }
+  /**
+   * @brief Log trace message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  void log_trace(const char * const format, ...) {
+
+    if (!current_logger_.load()->shouldLog(trace))
+      return;
+    FILL_BUFFER
+    current_logger_.load()->log_str(trace, buffer);
+  }
+
+  /**
+   * @brief Log message
+   * @param format format string ('man printf' for syntax)
+   * @warning does not check @p log or @p format for null. Caller must ensure 
parameters and format string lengths match
+   */
+  virtual void log_str(LOG_LEVEL_E level, const std::string &buffer) {
+    current_logger_.load()->log_str(level, buffer);
+  }
+
+  // Destructor
+  ~Logger() {
+  }
+
+  explicit Logger(const singleton &a) {
+
+    /**
+     * flush on info to maintain current functionality
+     */
+    std::shared_ptr<spdlog::logger> defaultsink = spdlog::rotating_logger_mt(
+        LOG_NAME,
+        LOG_FILE_NAME,
+        DEFAULT_LOG_FILE_SIZE, DEFAULT_LOG_FILE_NUMBER);
+    defaultsink->flush_on(spdlog::level::level_enum::info);
+
+    std::unique_ptr<BaseLogger> new_logger_ = std::unique_ptr<BaseLogger>(
+        new BaseLogger("info", defaultsink));
+
+    new_logger_->setLogLevel(info);
+    current_logger_.store(new_logger_.release());
+  }
+
+  Logger(const Logger &parent) = delete;
+  Logger &operator=(const Logger &parent) = delete;
+
+ protected:
+
+  /**
+   * Allows for a null constructor above so that we can have a public 
constructor that
+   * effectively limits us to being a singleton by having a protected argument 
in the constructor
+   */
+  struct singleton {
+    explicit singleton(int) {
+    }
+  };
+
+  std::atomic<BaseLogger*> current_logger_;
+
+// Singleton logger instance
+  static std::shared_ptr<Logger> singleton_logger_;
+};
+
+} /* namespace logging */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/repository/FlowFileRepository.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/repository/FlowFileRepository.h 
b/libminifi/include/core/repository/FlowFileRepository.h
new file mode 100644
index 0000000..31e655a
--- /dev/null
+++ b/libminifi/include/core/repository/FlowFileRepository.h
@@ -0,0 +1,169 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_
+#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_
+
+#include "leveldb/db.h"
+#include "leveldb/options.h"
+#include "leveldb/slice.h"
+#include "leveldb/status.h"
+#include "core/Repository.h"
+#include "core/core.h"
+#include "Connection.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace repository {
+
+
+
+#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
+#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024) // 10M
+#define MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME (600000) // 10 minute
+#define FLOWFILE_REPOSITORY_PURGE_PERIOD (2500) // 2500 msec
+
+/**
+ * Flow File repository
+ * Design: Extends Repository and implements the run function, using LevelDB 
as the primary substrate.
+ */
+class FlowFileRepository : public core::Repository, public 
std::enable_shared_from_this<FlowFileRepository> {
+ public:
+  // Constructor
+
+
+   
+  FlowFileRepository(std::string directory, int64_t maxPartitionMillis,
+                     int64_t maxPartitionBytes, uint64_t purgePeriod)
+      : Repository(core::getClassName<FlowFileRepository>(), directory,
+                   maxPartitionMillis, maxPartitionBytes, purgePeriod)
+
+  {
+    db_ = NULL;
+  }
+  
+  FlowFileRepository() : FlowFileRepository(FLOWFILE_REPOSITORY_DIRECTORY,
+                       MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME, 
MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, FLOWFILE_REPOSITORY_PURGE_PERIOD)
+  {
+  }
+
+  // Destructor
+  ~FlowFileRepository() {
+    if (db_)
+      delete db_;
+  }
+
+  // initialize
+  virtual bool initialize() {
+    std::string value;
+
+    if (configure_->get(Configure::nifi_flowfile_repository_directory_default,
+                        value)) {
+      directory_ = value;
+    }
+    logger_->log_info("NiFi FlowFile Repository Directory %s",
+                      directory_.c_str());
+    if (configure_->get(Configure::nifi_flowfile_repository_max_storage_size,
+                        value)) {
+      Property::StringToInt(value, max_partition_bytes_);
+    }
+    logger_->log_info("NiFi FlowFile Max Partition Bytes %d",
+                      max_partition_bytes_);
+    if (configure_->get(Configure::nifi_flowfile_repository_max_storage_time,
+                        value)) {
+      TimeUnit unit;
+      if (Property::StringToTime(value, max_partition_millis_, unit)
+          && Property::ConvertTimeUnitToMS(max_partition_millis_, unit,
+                                           max_partition_millis_)) {
+      }
+    }
+    logger_->log_info("NiFi FlowFile Max Storage Time: [%d] ms",
+                      max_partition_millis_);
+    leveldb::Options options;
+    options.create_if_missing = true;
+    leveldb::Status status = leveldb::DB::Open(options, directory_.c_str(),
+                                               &db_);
+    if (status.ok()) {
+      logger_->log_info("NiFi FlowFile Repository database open %s success",
+                        directory_.c_str());
+    } else {
+      logger_->log_error("NiFi FlowFile Repository database open %s fail",
+                         directory_.c_str());
+      return false;
+    }
+    return true;
+  }
+
+  virtual void run();
+  
+  virtual bool Put(std::string key, uint8_t *buf, int bufLen)
+  {
+                 
+         // persistent to the DB
+         leveldb::Slice value((const char *) buf, bufLen);
+         leveldb::Status status;
+         status = db_->Put(leveldb::WriteOptions(), key, value);
+         if (status.ok())
+                 return true;
+         else
+                 return false;
+  }
+  /**
+  * 
+  * Deletes the key
+  * @return status of the delete operation
+  */
+  virtual bool Delete(std::string key)
+  {
+         leveldb::Status status;
+         status = db_->Delete(leveldb::WriteOptions(), key);
+         if (status.ok())
+                 return true;
+         else
+                 return false;
+  }
+  /**
+    * Sets the value from the provided key
+    * @return status of the get operation.
+    */
+  virtual bool Get(std::string key, std::string &value)
+  {
+         leveldb::Status status;
+         status = db_->Get(leveldb::ReadOptions(), key, &value);
+         if (status.ok())
+                 return true;
+         else
+                 return false;
+  }
+  
+  void loadFlowFileToConnections(std::map<std::string, 
std::shared_ptr<minifi::Connection>> &connectionMap);
+  
+ private:
+  leveldb::DB* db_;
+};
+
+} /* namespace repository */
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_REPOSITORY_FLOWFILEREPOSITORY_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/core/yaml/YamlConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h 
b/libminifi/include/core/yaml/YamlConfiguration.h
new file mode 100644
index 0000000..0ca9190
--- /dev/null
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_YAMLCONFIGURATION_H_
+#define LIBMINIFI_INCLUDE_CORE_YAMLCONFIGURATION_H_
+
+#include "core/ProcessorConfig.h"
+#include "yaml-cpp/yaml.h"
+#include "../FlowConfiguration.h"
+#include "Site2SiteClientProtocol.h"
+#include <string>
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+#define DEFAULT_FLOW_YAML_FILE_NAME "conf/flow.yml"
+#define CONFIG_YAML_PROCESSORS_KEY "Processors"
+
+class YamlConfiguration : public FlowConfiguration {
+
+ public:
+  YamlConfiguration(std::shared_ptr<core::Repository> repo,
+                    std::shared_ptr<core::Repository> flow_file_repo,
+                    const std::string path = DEFAULT_FLOW_YAML_FILE_NAME)
+      : FlowConfiguration(repo, flow_file_repo, path) {
+    if (IsNullOrEmpty(config_path_)) {
+      config_path_ = DEFAULT_FLOW_YAML_FILE_NAME;
+    }
+  }
+
+  virtual ~YamlConfiguration() {
+
+  }
+
+  std::unique_ptr<core::ProcessGroup> getRoot(const std::string &from_config) {
+
+    YAML::Node flow = YAML::LoadFile(from_config);
+
+    YAML::Node flowControllerNode = flow["Flow Controller"];
+    YAML::Node processorsNode = flow[CONFIG_YAML_PROCESSORS_KEY];
+    YAML::Node connectionsNode = flow["Connections"];
+    YAML::Node remoteProcessingGroupNode = flow["Remote Processing Groups"];
+
+    // Create the root process group
+    core::ProcessGroup * root = parseRootProcessGroupYaml(flowControllerNode);
+    parseProcessorNodeYaml(processorsNode, root);
+    parseRemoteProcessGroupYaml(&remoteProcessingGroupNode, root);
+    parseConnectionYaml(&connectionsNode, root);
+
+    return std::unique_ptr<core::ProcessGroup>(root);
+
+  }
+ protected:
+  // Process Processor Node YAML
+  void parseProcessorNodeYaml(YAML::Node processorNode,
+                              core::ProcessGroup * parent);
+  // Process Port YAML
+  void parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent,
+                     TransferDirection direction);
+  // Process Root Processor Group YAML
+  core::ProcessGroup *parseRootProcessGroupYaml(YAML::Node rootNode);
+  // Process Property YAML
+  void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node,
+                                  std::shared_ptr<core::Processor> processor);
+  // Process connection YAML
+  void parseConnectionYaml(YAML::Node *node, core::ProcessGroup * parent);
+  // Process Remote Process Group YAML
+  void parseRemoteProcessGroupYaml(YAML::Node *node,
+                                   core::ProcessGroup * parent);
+  // Parse Properties Node YAML for a processor
+  void parsePropertiesNodeYaml(YAML::Node *propertiesNode,
+                               std::shared_ptr<core::Processor> processor);
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_YAMLCONFIGURATION_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/BaseStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/BaseStream.h 
b/libminifi/include/io/BaseStream.h
index c3ebe42..b0b3589 100644
--- a/libminifi/include/io/BaseStream.h
+++ b/libminifi/include/io/BaseStream.h
@@ -19,132 +19,144 @@
 #ifndef LIBMINIFI_INCLUDE_IO_BASESTREAM_H_
 #define LIBMINIFI_INCLUDE_IO_BASESTREAM_H_
 
-
 #include <cstdint>
 #include "EndianCheck.h"
 #include "DataStream.h"
 #include "Serializable.h"
-class BaseStream: public DataStream, public Serializable {
-
-public:
-       BaseStream() {
-
-       }
-       virtual ~BaseStream() {
-
-       }
-       /**
-        * write 4 bytes to stream
-        * @param base_value non encoded value
-        * @param stream output stream
-        * @param is_little_endian endianness determination
-        * @return resulting write size
-        **/
-       virtual int write(uint32_t base_value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       /**
-        * write 2 bytes to stream
-        * @param base_value non encoded value
-        * @param stream output stream
-        * @param is_little_endian endianness determination
-        * @return resulting write size
-        **/
-       virtual int write(uint16_t base_value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       /**
-        * write valueto stream
-        * @param value non encoded value
-        * @param len length of value
-        * @param strema output stream
-        * @return resulting write size
-        **/
-       virtual int write(uint8_t *value, int len);
-
-       /**
-        * write 8 bytes to stream
-        * @param base_value non encoded value
-        * @param stream output stream
-        * @param is_little_endian endianness determination
-        * @return resulting write size
-        **/
-       virtual int write(uint64_t base_value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       /**
-        * write bool to stream
-        * @param value non encoded value
-        * @return resulting write size
-        **/
-       virtual int write(bool value);
-
-       /**
-        * write UTF string to stream
-        * @param str string to write
-        * @return resulting write size
-        **/
-       virtual int writeUTF(std::string str, bool widen = false);
-
-       /**
-        * reads a byte from the stream
-        * @param value reference in which will set the result
-        * @param stream stream from which we will read
-        * @return resulting read size
-        **/
-       virtual int read(uint8_t &value);
-
-       /**
-        * reads two bytes from the stream
-        * @param value reference in which will set the result
-        * @param stream stream from which we will read
-        * @return resulting read size
-        **/
-       virtual int read(uint16_t &base_value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       /**
-        * reads a byte from the stream
-        * @param value reference in which will set the result
-        * @param stream stream from which we will read
-        * @return resulting read size
-        **/
-       virtual int read(char &value);
-
-       /**
-        * reads a byte array from the stream
-        * @param value reference in which will set the result
-        * @param len length to read
-        * @param stream stream from which we will read
-        * @return resulting read size
-        **/
-       virtual int read(uint8_t *value, int len);
-
-       /**
-        * reads four bytes from the stream
-        * @param value reference in which will set the result
-        * @param stream stream from which we will read
-        * @return resulting read size
-        **/
-       virtual int read(uint32_t &value,
-                       bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-       /**
-        * reads eight byte from the stream
-        * @param value reference in which will set the result
-        * @param stream stream from which we will read
-        * @return resulting read size
-        **/
-       virtual int read(uint64_t &value,
-                       bool is_little_endian = EndiannessCheck::IS_LITTLE);
-
-       /**
-        * read UTF from stream
-        * @param str reference string
-        * @param stream stream from which we will read
-        * @return resulting read size
-        **/
-       virtual int readUTF(std::string &str, bool widen = false);
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+
+class BaseStream : public DataStream, public Serializable {
+
+ public:
+  BaseStream() {
+
+  }
+  virtual ~BaseStream() {
+
+  }
+  /**
+   * write 4 bytes to stream
+   * @param base_value non encoded value
+   * @param stream output stream
+   * @param is_little_endian endianness determination
+   * @return resulting write size
+   **/
+  virtual int write(uint32_t base_value, bool is_little_endian =
+                        EndiannessCheck::IS_LITTLE);
+
+  /**
+   * write 2 bytes to stream
+   * @param base_value non encoded value
+   * @param stream output stream
+   * @param is_little_endian endianness determination
+   * @return resulting write size
+   **/
+  virtual int write(uint16_t base_value, bool is_little_endian =
+                        EndiannessCheck::IS_LITTLE);
+
+  /**
+   * write valueto stream
+   * @param value non encoded value
+   * @param len length of value
+   * @param strema output stream
+   * @return resulting write size
+   **/
+  virtual int write(uint8_t *value, int len);
+
+  /**
+   * write 8 bytes to stream
+   * @param base_value non encoded value
+   * @param stream output stream
+   * @param is_little_endian endianness determination
+   * @return resulting write size
+   **/
+  virtual int write(uint64_t base_value, bool is_little_endian =
+                        EndiannessCheck::IS_LITTLE);
+
+  /**
+   * write bool to stream
+   * @param value non encoded value
+   * @return resulting write size
+   **/
+  virtual int write(bool value);
+
+  /**
+   * write UTF string to stream
+   * @param str string to write
+   * @return resulting write size
+   **/
+  virtual int writeUTF(std::string str, bool widen = false);
+
+  /**
+   * reads a byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint8_t &value);
+
+  /**
+   * reads two bytes from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint16_t &base_value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  /**
+   * reads a byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(char &value);
+
+  /**
+   * reads a byte array from the stream
+   * @param value reference in which will set the result
+   * @param len length to read
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint8_t *value, int len);
+
+  /**
+   * reads four bytes from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint32_t &value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  /**
+   * reads eight byte from the stream
+   * @param value reference in which will set the result
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int read(uint64_t &value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  /**
+   * read UTF from stream
+   * @param str reference string
+   * @param stream stream from which we will read
+   * @return resulting read size
+   **/
+  virtual int readUTF(std::string &str, bool widen = false);
 };
 
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif /* LIBMINIFI_INCLUDE_IO_BASESTREAM_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/CRCStream.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index 01b6199..99fdfc3 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -25,282 +25,279 @@
 #include "BaseStream.h"
 #include "Serializable.h"
 
-#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
 
+#define htonll_r(x) ((((uint64_t)htonl(x)) << 32) + htonl((x) >> 32))
 
 template<typename T>
-class CRCStream: public BaseStream {
-public:
-       /**
-        * Raw pointer because the caller guarantees that
-        * it will exceed our lifetime.
-        */
-       explicit CRCStream(T *stream);
-
-       explicit CRCStream( CRCStream<T> &&move );
-
-       virtual ~CRCStream() {
-
-       }
-
-       /**
-        * Reads data and places it into buf
-        * @param buf buffer in which we extract data
-        * @param buflen
-        */
-       virtual int readData(std::vector<uint8_t> &buf, int buflen);
-       /**
-        * Reads data and places it into buf
-        * @param buf buffer in which we extract data
-        * @param buflen
-        */
-       virtual int readData(uint8_t *buf, int buflen);
-
-       /**
-        * Write value to the stream using std::vector
-        * @param buf incoming buffer
-        * @param buflen buffer to write
-        *
-        */
-       virtual int writeData(std::vector<uint8_t> &buf, int buflen);
-
-       /**
-        * writes value to stream
-        * @param value value to write
-        * @param size size of value
-        */
-       virtual int writeData(uint8_t *value, int size);
-       
-       /**
-        * write 4 bytes to stream
-        * @param base_value non encoded value
-        * @param stream output stream
-        * @param is_little_endian endianness determination
-        * @return resulting write size
-        **/
-       virtual int write(uint32_t base_value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-       /**
-        * write 2 bytes to stream
-        * @param base_value non encoded value
-        * @param stream output stream
-        * @param is_little_endian endianness determination
-        * @return resulting write size
-        **/
-       virtual int write(uint16_t base_value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-
-       /**
-        * write 8 bytes to stream
-        * @param base_value non encoded value
-        * @param stream output stream
-        * @param is_little_endian endianness determination
-        * @return resulting write size
-        **/
-       virtual int write(uint64_t base_value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-       
-       
-       
-       /**
-        * Reads a system word
-        * @param value value to write
-        */
-       virtual int read(uint64_t &value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       /**
-        * Reads a uint32_t
-        * @param value value to write
-        */
-       virtual int read(uint32_t &value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       /**
-        * Reads a system short
-        * @param value value to write
-        */
-       virtual int read(uint16_t &value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-       
-
-       virtual short initialize() {
-               child_stream_->initialize();
-               reset();
-               return 0;
-       }
-
-
-       void updateCRC(uint8_t *buffer, uint32_t length);
-
-       uint64_t getCRC() {
-               return crc_;
-       }
-
-       void reset();
-protected:
-  
+class CRCStream : public BaseStream {
+ public:
   /**
-        * Creates a vector and returns the vector using the provided
-        * type name.
-        * @param t incoming object
-        * @returns vector.
-        */
-       template<typename K>
-       std::vector<uint8_t> readBuffer(const K& t){
-           std::vector<uint8_t> buf;
-           buf.resize(sizeof t);
-           readData((uint8_t*) &buf[0], sizeof(t));
-           return buf;
-       }
-       
-  
-       uint64_t crc_;
-       T *child_stream_;
-};
+   * Raw pointer because the caller guarantees that
+   * it will exceed our lifetime.
+   */
+  explicit CRCStream(T *stream);
+
+  explicit CRCStream(CRCStream<T> &&move);
+
+  virtual ~CRCStream() {
+
+  }
+
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen);
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen);
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */
+  virtual int writeData(uint8_t *value, int size);
+
+  /**
+   * write 4 bytes to stream
+   * @param base_value non encoded value
+   * @param stream output stream
+   * @param is_little_endian endianness determination
+   * @return resulting write size
+   **/
+  virtual int write(uint32_t base_value, bool is_little_endian =
+                        EndiannessCheck::IS_LITTLE);
+  /**
+   * write 2 bytes to stream
+   * @param base_value non encoded value
+   * @param stream output stream
+   * @param is_little_endian endianness determination
+   * @return resulting write size
+   **/
+  virtual int write(uint16_t base_value, bool is_little_endian =
+                        EndiannessCheck::IS_LITTLE);
+
+  /**
+   * write 8 bytes to stream
+   * @param base_value non encoded value
+   * @param stream output stream
+   * @param is_little_endian endianness determination
+   * @return resulting write size
+   **/
+  virtual int write(uint64_t base_value, bool is_little_endian =
+                        EndiannessCheck::IS_LITTLE);
 
+  /**
+   * Reads a system word
+   * @param value value to write
+   */
+  virtual int read(uint64_t &value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Reads a uint32_t
+   * @param value value to write
+   */
+  virtual int read(uint32_t &value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Reads a system short
+   * @param value value to write
+   */
+  virtual int read(uint16_t &value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  virtual short initialize() {
+    child_stream_->initialize();
+    reset();
+    return 0;
+  }
+
+  void updateCRC(uint8_t *buffer, uint32_t length);
+
+  uint64_t getCRC() {
+    return crc_;
+  }
+
+  void reset();
+ protected:
+
+  /**
+   * Creates a vector and returns the vector using the provided
+   * type name.
+   * @param t incoming object
+   * @returns vector.
+   */
+  template<typename K>
+  std::vector<uint8_t> readBuffer(const K& t) {
+    std::vector<uint8_t> buf;
+    buf.resize(sizeof t);
+    readData((uint8_t*) &buf[0], sizeof(t));
+    return buf;
+  }
+
+  uint64_t crc_;
+  T *child_stream_;
+};
 
 template<typename T>
-CRCStream<T>::CRCStream(T *other) :
-               child_stream_(other) {
-       crc_ = crc32(0L, Z_NULL, 0);
+CRCStream<T>::CRCStream(T *other)
+    : child_stream_(other) {
+  crc_ = crc32(0L, Z_NULL, 0);
 }
 
 template<typename T>
-CRCStream<T>::CRCStream(CRCStream<T> &&move) :
-               crc_(std::move(move.crc_)), 
child_stream_(std::move(move.child_stream_)) {
+CRCStream<T>::CRCStream(CRCStream<T> &&move)
+    : crc_(std::move(move.crc_)),
+      child_stream_(std::move(move.child_stream_)) {
 
 }
 
 template<typename T>
 int CRCStream<T>::readData(std::vector<uint8_t> &buf, int buflen) {
 
-       if (buf.capacity() < buflen)
-               buf.resize(buflen);
-       return readData((uint8_t*) &buf[0], buflen);
+  if (buf.capacity() < buflen)
+    buf.resize(buflen);
+  return readData((uint8_t*) &buf[0], buflen);
 }
 
 template<typename T>
 int CRCStream<T>::readData(uint8_t *buf, int buflen) {
-       int ret = child_stream_->read(buf, buflen);
-       crc_ = crc32(crc_, buf, buflen);
-       return ret;
+  int ret = child_stream_->read(buf, buflen);
+  crc_ = crc32(crc_, buf, buflen);
+  return ret;
 }
 
 template<typename T>
 int CRCStream<T>::writeData(std::vector<uint8_t> &buf, int buflen) {
 
-       if (buf.capacity() < buflen)
-               buf.resize(buflen);
-       return writeData((uint8_t*) &buf[0], buflen);
+  if (buf.capacity() < buflen)
+    buf.resize(buflen);
+  return writeData((uint8_t*) &buf[0], buflen);
 }
 
 template<typename T>
 int CRCStream<T>::writeData(uint8_t *value, int size) {
 
-       int ret = child_stream_->write(value, size);
-       crc_ = crc32(crc_, value, size);
-       return ret;
+  int ret = child_stream_->write(value, size);
+  crc_ = crc32(crc_, value, size);
+  return ret;
 
 }
 template<typename T>
 void CRCStream<T>::reset() {
-       crc_ = crc32(0L, Z_NULL, 0);
+  crc_ = crc32(0L, Z_NULL, 0);
 }
 template<typename T>
 void CRCStream<T>::updateCRC(uint8_t *buffer, uint32_t length) {
-       crc_ = crc32(crc_, buffer, length);
+  crc_ = crc32(crc_, buffer, length);
 }
 
 template<typename T>
-int CRCStream<T>::write(uint64_t base_value, bool is_little_endian){
-  
-   const uint64_t value =
-        is_little_endian == 1 ? htonll_r(base_value) : base_value;
-    uint8_t bytes[sizeof value];
-    std::copy(static_cast<const char*>(static_cast<const void*>(&value)),
-              static_cast<const char*>(static_cast<const void*>(&value)) + 
sizeof value,
-              bytes);
-    return writeData(bytes,sizeof value);
-}
+int CRCStream<T>::write(uint64_t base_value, bool is_little_endian) {
 
+  const uint64_t value =
+      is_little_endian == 1 ? htonll_r(base_value) : base_value;
+  uint8_t bytes[sizeof value];
+  std::copy(
+      static_cast<const char*>(static_cast<const void*>(&value)),
+      static_cast<const char*>(static_cast<const void*>(&value)) + sizeof 
value,
+      bytes);
+  return writeData(bytes, sizeof value);
+}
 
 template<typename T>
-int CRCStream<T>::write(uint32_t base_value, bool is_little_endian){
-   const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
-    uint8_t bytes[sizeof value];
-    std::copy(static_cast<const char*>(static_cast<const void*>(&value)),
-              static_cast<const char*>(static_cast<const void*>(&value)) + 
sizeof value,
-              bytes);
-    return writeData(bytes,sizeof value);
+int CRCStream<T>::write(uint32_t base_value, bool is_little_endian) {
+  const uint32_t value = is_little_endian ? htonl(base_value) : base_value;
+  uint8_t bytes[sizeof value];
+  std::copy(
+      static_cast<const char*>(static_cast<const void*>(&value)),
+      static_cast<const char*>(static_cast<const void*>(&value)) + sizeof 
value,
+      bytes);
+  return writeData(bytes, sizeof value);
 }
 
 template<typename T>
-int CRCStream<T>::write(uint16_t base_value, bool is_little_endian){
-  const uint16_t value =
-        is_little_endian == 1 ? htons(base_value) : base_value;
+int CRCStream<T>::write(uint16_t base_value, bool is_little_endian) {
+  const uint16_t value = is_little_endian == 1 ? htons(base_value) : 
base_value;
   uint8_t bytes[sizeof value];
-    std::copy(static_cast<const char*>(static_cast<const void*>(&value)),
-              static_cast<const char*>(static_cast<const void*>(&value)) + 
sizeof value,
-              bytes);
-    return writeData(bytes,sizeof value);
+  std::copy(
+      static_cast<const char*>(static_cast<const void*>(&value)),
+      static_cast<const char*>(static_cast<const void*>(&value)) + sizeof 
value,
+      bytes);
+  return writeData(bytes, sizeof value);
 }
 
-
 template<typename T>
 int CRCStream<T>::read(uint64_t &value, bool is_little_endian) {
 
-       auto buf = readBuffer(value);
-
-       if (is_little_endian) {
-               value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) 
<< 48)
-                               | ((uint64_t) (buf[2] & 255) << 40)
-                               | ((uint64_t) (buf[3] & 255) << 32)
-                               | ((uint64_t) (buf[4] & 255) << 24)
-                               | ((uint64_t) (buf[5] & 255) << 16)
-                               | ((uint64_t) (buf[6] & 255) << 8)
-                               | ((uint64_t) (buf[7] & 255) << 0);
-       } else {
-               value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) 
<< 8)
-                               | ((uint64_t) (buf[2] & 255) << 16)
-                               | ((uint64_t) (buf[3] & 255) << 24)
-                               | ((uint64_t) (buf[4] & 255) << 32)
-                               | ((uint64_t) (buf[5] & 255) << 40)
-                               | ((uint64_t) (buf[6] & 255) << 48)
-                               | ((uint64_t) (buf[7] & 255) << 56);
-       }
-       return sizeof(value);
+  auto buf = readBuffer(value);
+
+  if (is_little_endian) {
+    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48)
+        | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32)
+        | ((uint64_t) (buf[4] & 255) << 24) | ((uint64_t) (buf[5] & 255) << 16)
+        | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0);
+  } else {
+    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8)
+        | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24)
+        | ((uint64_t) (buf[4] & 255) << 32) | ((uint64_t) (buf[5] & 255) << 40)
+        | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 
56);
+  }
+  return sizeof(value);
 }
 
 template<typename T>
 int CRCStream<T>::read(uint32_t &value, bool is_little_endian) {
 
-       auto buf = readBuffer(value);
+  auto buf = readBuffer(value);
 
-       if (is_little_endian) {
-               value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | 
buf[3];
-       } else {
-               value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+  if (is_little_endian) {
+    value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+  } else {
+    value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
 
-       }
+  }
 
-       return sizeof(value);
+  return sizeof(value);
 }
 
 template<typename T>
 int CRCStream<T>::read(uint16_t &value, bool is_little_endian) {
 
-       auto buf = readBuffer(value);
+  auto buf = readBuffer(value);
 
-       if (is_little_endian) {
-               value = (buf[0] << 8) | buf[1];
-       } else {
-               value = buf[0] | buf[1] << 8;
+  if (is_little_endian) {
+    value = (buf[0] << 8) | buf[1];
+  } else {
+    value = buf[0] | buf[1] << 8;
 
-       }
-       return sizeof(value);
+  }
+  return sizeof(value);
 }
 
-
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif /* LIBMINIFI_INCLUDE_IO_CRCSTREAM_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/include/io/ClientSocket.h
----------------------------------------------------------------------
diff --git a/libminifi/include/io/ClientSocket.h 
b/libminifi/include/io/ClientSocket.h
index 3f8aae1..97cace2 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -26,10 +26,17 @@
 #include <mutex>
 #include <atomic>
 #include "io/BaseStream.h"
-#include "Logger.h"
+#include "core/core.h"
+#include "core/logging/Logger.h"
 
 #include "io/validation.h"
 
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
 /**
  * Socket class.
  * Purpose: Provides a general purpose socket interface that abstracts
@@ -39,207 +46,209 @@
  *
  *
  */
-class Socket: public BaseStream {
-public:
-       /**
-        * Constructor that accepts host name, port and listeners. With this
-        * contructor we will be creating a server socket
-        * @param hostname our host name
-        * @param port connecting port
-        * @param listeners number of listeners in the queue
-        */
-       explicit Socket(const std::string &hostname, const uint16_t port,
-                       const uint16_t listeners);
-
-       /**
-        * Constructor that creates a client socket.
-        * @param hostname hostname we are connecting to.
-        * @param port port we are connecting to.
-        */
-       explicit Socket(const std::string &hostname, const uint16_t port);
-
-       /**
-        * Move constructor.
-        */
-       explicit Socket(const Socket &&);
-
-       static std::string HOSTNAME;
-
-       /**
-        * Static function to return the current machine's host name
-        */
-       static std::string getMyHostName(std::string *str = &HOSTNAME) {
-               if (__builtin_expect(!IsNullOrEmpty(str), 0))
-                       return *str;
-               else {
-                       char hostname[1024];
-                       gethostname(hostname, 1024);
-                       Socket mySock(hostname, 0);
-                       mySock.initialize();
-                       return mySock.getHostname();
-               }
-       }
-
-       /**
-        * Destructor
-        */
-
-       virtual ~Socket();
-
-       virtual void closeStream();
-       /**
-        * Initializes the socket
-        * @return result of the creation operation.
-        */
-       virtual short initialize();
-
-       std::string getHostname() const;
-
-       /**
-        * Return the port for this socket
-        * @returns port
-        */
-       uint16_t getPort();
-
-       // data stream extensions
-       /**
-        * Reads data and places it into buf
-        * @param buf buffer in which we extract data
-        * @param buflen
-        */
-       virtual int readData(std::vector<uint8_t> &buf, int buflen);
-       /**
-        * Reads data and places it into buf
-        * @param buf buffer in which we extract data
-        * @param buflen
-        */
-       virtual int readData(uint8_t *buf, int buflen);
-
-       /**
-        * Write value to the stream using std::vector
-        * @param buf incoming buffer
-        * @param buflen buffer to write
-        *
-        */
-       virtual int writeData(std::vector<uint8_t> &buf, int buflen);
-
-       /**
-        * writes value to stream
-        * @param value value to write
-        * @param size size of value
-        */
-       virtual int writeData(uint8_t *value, int size);
-
-       
-       
-       /**
-        * Writes a system word
-        * @param value value to write
-        */
-       virtual int write(uint64_t value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       /**
-        * Writes a uint32_t
-        * @param value value to write
-        */
-       virtual int write(uint32_t value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       /**
-        * Writes a system short
-        * @param value value to write
-        */
-       virtual int write(uint16_t value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       
-       /**
-        * Reads a system word
-        * @param value value to write
-        */
-       virtual int read(uint64_t &value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       /**
-        * Reads a uint32_t
-        * @param value value to write
-        */
-       virtual int read(uint32_t &value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       /**
-        * Reads a system short
-        * @param value value to write
-        */
-       virtual int read(uint16_t &value, bool is_little_endian =
-                       EndiannessCheck::IS_LITTLE);
-
-       /**
-        * Returns the underlying buffer
-        * @return vector's array
-        **/
-       const uint8_t *getBuffer() const {
-               return ::DataStream::getBuffer();
-       }
-
-       /**
-        * Retrieve size of data stream
-        * @return size of data stream
-        **/
-       const uint32_t getSize() const {
-               return ::DataStream::getSize();
-       }
-
-protected:
-
-       /**
-        * Creates a vector and returns the vector using the provided
-        * type name.
-        * @param t incoming object
-        * @returns vector.
-        */
-       template<typename T>
-       std::vector<uint8_t> readBuffer(const T&);
-
-       /**
-        * Creates a connection using the address info object.
-        * @param p addrinfo structure.
-        * @returns fd.
-        */
-       virtual int8_t createConnection(const addrinfo *p,in_addr_t &addr);
-
-       /**
-        * Sets socket options depending on the instance.
-        * @param sock socket file descriptor.
-        */
-       virtual short setSocketOptions(const int sock);
-
-       /**
-        * Attempt to select the socket file descriptor
-        * @param msec timeout interval to wait
-        * @returns file descriptor
-        */
-       virtual short select_descriptor(const uint16_t msec);
-
-       std::shared_ptr<Logger> logger_;
-
-       addrinfo *addr_info_;
-
-       std::recursive_mutex selection_mutex_;
-
-       std::string requested_hostname_;
-       std::string canonical_hostname_;
-       uint16_t port_;
-
-       // connection information
-       int32_t socket_file_descriptor_;
-
-       fd_set total_list_;
-       fd_set read_fds_;
-       std::atomic<uint16_t> socket_max_;
-       uint16_t listeners_;
+class Socket : public BaseStream {
+ public:
+  /**
+   * Constructor that accepts host name, port and listeners. With this
+   * contructor we will be creating a server socket
+   * @param hostname our host name
+   * @param port connecting port
+   * @param listeners number of listeners in the queue
+   */
+  explicit Socket(const std::string &hostname, const uint16_t port,
+                  const uint16_t listeners);
+
+  /**
+   * Constructor that creates a client socket.
+   * @param hostname hostname we are connecting to.
+   * @param port port we are connecting to.
+   */
+  explicit Socket(const std::string &hostname, const uint16_t port);
+
+  /**
+   * Move constructor.
+   */
+  explicit Socket(const Socket &&);
+
+  static std::string HOSTNAME;
+
+  /**
+   * Static function to return the current machine's host name
+   */
+  static std::string getMyHostName(std::string *str = &HOSTNAME) {
+    if (__builtin_expect(!IsNullOrEmpty(str), 0))
+      return *str;
+    else {
+      char hostname[1024];
+      gethostname(hostname, 1024);
+      Socket mySock(hostname, 0);
+      mySock.initialize();
+      return mySock.getHostname();
+    }
+  }
+
+  /**
+   * Destructor
+   */
+
+  virtual ~Socket();
+
+  virtual void closeStream();
+  /**
+   * Initializes the socket
+   * @return result of the creation operation.
+   */
+  virtual short initialize();
+
+  std::string getHostname() const;
+
+  /**
+   * Return the port for this socket
+   * @returns port
+   */
+  uint16_t getPort();
+
+  // data stream extensions
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(std::vector<uint8_t> &buf, int buflen);
+  /**
+   * Reads data and places it into buf
+   * @param buf buffer in which we extract data
+   * @param buflen
+   */
+  virtual int readData(uint8_t *buf, int buflen);
+
+  /**
+   * Write value to the stream using std::vector
+   * @param buf incoming buffer
+   * @param buflen buffer to write
+   *
+   */
+  virtual int writeData(std::vector<uint8_t> &buf, int buflen);
+
+  /**
+   * writes value to stream
+   * @param value value to write
+   * @param size size of value
+   */
+  virtual int writeData(uint8_t *value, int size);
+
+  /**
+   * Writes a system word
+   * @param value value to write
+   */
+  virtual int write(uint64_t value, bool is_little_endian =
+                        EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Writes a uint32_t
+   * @param value value to write
+   */
+  virtual int write(uint32_t value, bool is_little_endian =
+                        EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Writes a system short
+   * @param value value to write
+   */
+  virtual int write(uint16_t value, bool is_little_endian =
+                        EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Reads a system word
+   * @param value value to write
+   */
+  virtual int read(uint64_t &value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Reads a uint32_t
+   * @param value value to write
+   */
+  virtual int read(uint32_t &value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Reads a system short
+   * @param value value to write
+   */
+  virtual int read(uint16_t &value, bool is_little_endian =
+                       EndiannessCheck::IS_LITTLE);
+
+  /**
+   * Returns the underlying buffer
+   * @return vector's array
+   **/
+  const uint8_t *getBuffer() const {
+    return DataStream::getBuffer();
+  }
+
+  /**
+   * Retrieve size of data stream
+   * @return size of data stream
+   **/
+  const uint32_t getSize() const {
+    return DataStream::getSize();
+  }
+
+ protected:
+
+  /**
+   * Creates a vector and returns the vector using the provided
+   * type name.
+   * @param t incoming object
+   * @returns vector.
+   */
+  template<typename T>
+  std::vector<uint8_t> readBuffer(const T&);
+
+  /**
+   * Creates a connection using the address info object.
+   * @param p addrinfo structure.
+   * @returns fd.
+   */
+  virtual int8_t createConnection(const addrinfo *p, in_addr_t &addr);
+
+  /**
+   * Sets socket options depending on the instance.
+   * @param sock socket file descriptor.
+   */
+  virtual short setSocketOptions(const int sock);
+
+  /**
+   * Attempt to select the socket file descriptor
+   * @param msec timeout interval to wait
+   * @returns file descriptor
+   */
+  virtual short select_descriptor(const uint16_t msec);
+
+  std::shared_ptr<logging::Logger> logger_;
+
+  addrinfo *addr_info_;
+
+  std::recursive_mutex selection_mutex_;
+
+  std::string requested_hostname_;
+  std::string canonical_hostname_;
+  uint16_t port_;
+
+  // connection information
+  int32_t socket_file_descriptor_;
+
+  fd_set total_list_;
+  fd_set read_fds_;
+  std::atomic<uint16_t> socket_max_;
+  uint16_t listeners_;
 
 };
 
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
 #endif /* LIBMINIFI_INCLUDE_IO_CLIENTSOCKET_H_ */

Reply via email to