Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 54be9d403 -> 2821d71d6


MINIFICPP-491: Disable logging for C api

MINIFICPP-492: Resolve issues with resource claims and volatile repos in
C API

MINIFICPP-486: Begin forming async control functions

MINIFICPP-494: Resolve warning in valgrind to help debug failing test

MINIFICPP-486: Rename block directory

MINIFICPP-486: Update to async API

This closes #327.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/2821d71d
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/2821d71d
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/2821d71d

Branch: refs/heads/master
Commit: 2821d71d6ee965c7390def385bcd3b5ae0d4c376
Parents: 54be9d4
Author: Marc Parisi <[email protected]>
Authored: Tue May 8 12:43:30 2018 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Thu May 24 14:25:19 2018 -0400

----------------------------------------------------------------------
 CMakeLists.txt                                  |  1 +
 LibExample/CMakeLists.txt                       | 14 ++-
 LibExample/monitor_directory.c                  | 94 ++++++++++++++++++++
 LibExample/transmit_flow.c                      |  2 +-
 blocks/comms.h                                  | 47 ++++++++++
 blocks/file_blocks.h                            | 38 ++++++++
 extensions/rocksdb-repos/ProvenanceRepository.h |  1 -
 libminifi/include/c2/C2Agent.h                  |  2 +-
 libminifi/include/capi/C2CallbackAgent.h        | 82 +++++++++++++++++
 libminifi/include/capi/Instance.h               | 48 +++++++++-
 libminifi/include/capi/api.h                    | 63 +++++++++----
 libminifi/include/capi/processors.h             | 35 ++++++++
 libminifi/include/core/logging/Logger.h         | 33 ++++++-
 .../include/core/logging/LoggerConfiguration.h  | 13 ++-
 libminifi/src/capi/C2CallbackAgent.cpp          | 81 +++++++++++++++++
 libminifi/src/capi/Plan.cpp                     |  3 +-
 libminifi/src/capi/api.cpp                      | 85 ++++++++++++++++--
 libminifi/src/core/FlowFile.cpp                 |  1 -
 .../src/core/logging/LoggerConfiguration.cpp    |  6 +-
 .../repository/VolatileContentRepository.cpp    | 20 +++--
 libminifi/src/utils/Id.cpp                      |  2 +-
 21 files changed, 623 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8594eaf..7699d1f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -292,6 +292,7 @@ if (NOT DISABLE_CURL)
 endif()
 
 
+
 get_property(selected_extensions GLOBAL PROPERTY EXTENSION-OPTIONS)
 
 if (NOT BUILD_IDENTIFIER)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/LibExample/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/LibExample/CMakeLists.txt b/LibExample/CMakeLists.txt
index ddf1e20..0421da7 100644
--- a/LibExample/CMakeLists.txt
+++ b/LibExample/CMakeLists.txt
@@ -23,7 +23,7 @@ IF(POLICY CMP0048)
   CMAKE_POLICY(SET CMP0048 OLD)
 ENDIF(POLICY CMP0048)
 
-include_directories(../libminifi/include  ../libminifi/include/c2  
../libminifi/include/c2/protocols/  ../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../libminifi/include/core/yaml  ../libminifi/include/core  
../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue 
../thirdparty/yaml-cpp-yaml-cpp-20171024/include 
../thirdparty/civetweb-1.9.1/include ../thirdparty/)
+include_directories(../blocks/ ../libminifi/include  ../libminifi/include/c2  
../libminifi/include/c2/protocols/  ../libminifi/include/core/state 
./libminifi/include/core/statemanagement/metrics  
../libminifi/include/core/yaml  ../libminifi/include/core  
../thirdparty/spdlog-20170710/include ../thirdparty/concurrentqueue 
../thirdparty/yaml-cpp-yaml-cpp-20171024/include 
../thirdparty/civetweb-1.9.1/include ../thirdparty/)
 
 include(CheckCXXCompilerFlag)
 CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11)
@@ -38,7 +38,6 @@ endif()
 
 add_executable(transmit_flow transmit_flow.c)
 
-# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl and rocksdb
 target_link_libraries(transmit_flow capi core-minifi minifi)
 
 if (APPLE)
@@ -50,7 +49,6 @@ endif ()
 
 add_executable(generate_flow generate_flow.c)
 
-# Link against minifi, yaml-cpp, civetweb-cpp, uuid, openssl and rocksdb
 target_link_libraries(generate_flow capi core-minifi minifi)
 
 if (APPLE)
@@ -59,3 +57,13 @@ else ()
        target_link_libraries (generate_flow -Wl,--whole-archive 
minifi-http-curl -Wl,--no-whole-archive)
 endif ()
 
+
+add_executable(monitor_directory monitor_directory.c)
+
+target_link_libraries(monitor_directory capi core-minifi minifi)
+
+if (APPLE)
+       target_link_libraries (monitor_directory -Wl,-all_load minifi-http-curl)
+else ()
+       target_link_libraries (monitor_directory -Wl,--whole-archive 
minifi-http-curl -Wl,--no-whole-archive)
+endif ()

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/LibExample/monitor_directory.c
----------------------------------------------------------------------
diff --git a/LibExample/monitor_directory.c b/LibExample/monitor_directory.c
new file mode 100644
index 0000000..465106d
--- /dev/null
+++ b/LibExample/monitor_directory.c
@@ -0,0 +1,94 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+
+#include "file_blocks.h"
+#include "comms.h"
+#include "capi/api.h"
+#include "capi/processors.h"
+int is_dir(const char *path) {
+  struct stat stat_struct;
+  if (stat(path, &stat_struct) != 0)
+    return 0;
+  return S_ISDIR(stat_struct.st_mode);
+}
+
+pthread_mutex_t mutex;
+pthread_cond_t condition;
+
+int stopped;
+
+int stop_callback(char *c) {
+  pthread_mutex_lock(&mutex);
+  stopped = 1;
+  pthread_cond_signal(&condition);
+  pthread_mutex_unlock(&mutex);
+  return 0;
+}
+
+int is_stopped(void *ptr) {
+  int is_stop = 0;
+  pthread_mutex_lock(&mutex);
+  is_stop = stopped;
+  pthread_mutex_unlock(&mutex);
+  return is_stop;
+}
+
+/**
+ * This is an example of the C API that transmits a flow file to a remote 
instance.
+ */
+int main(int argc, char **argv) {
+  if (argc < 5) {
+    printf("Error: must run ./monitor_directory <instance> <remote port> 
<directory to monitor>\n");
+    exit(1);
+  }
+
+  stopped = 0x00;
+
+  char *instance_str = argv[1];
+  char *portStr = argv[2];
+  char *directory = argv[3];
+
+  nifi_port port;
+
+  port.pord_id = portStr;
+
+  C2_Server server;
+  server.url = argv[4];
+  server.ack_url = argv[5];
+  server.identifier = "monitor_directory";
+  server.type = REST;
+
+  nifi_instance *instance = create_instance(instance_str, &port);
+
+  // enable_async_c2(instance, &server, stop_callback, NULL, NULL);
+
+  flow *new_flow = monitor_directory(instance, directory, 0x00, KEEP_SOURCE);
+
+  transmit_to_nifi(instance, new_flow, is_stopped);
+
+  free_flow(new_flow);
+
+  free_instance(instance);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/LibExample/transmit_flow.c
----------------------------------------------------------------------
diff --git a/LibExample/transmit_flow.c b/LibExample/transmit_flow.c
index 64e2beb..7fdb3d8 100644
--- a/LibExample/transmit_flow.c
+++ b/LibExample/transmit_flow.c
@@ -54,7 +54,7 @@ void transfer_file_or_directory(nifi_instance *instance, char 
*file_or_dir) {
   } else {
     printf("Transferring %s\n",file_or_dir);
 
-    flow_file_record *record = create_flowfile(file_or_dir);
+    flow_file_record *record = create_flowfile(file_or_dir, 
strlen(file_or_dir));
 
     add_attribute(record, "addedattribute", "1", 2);
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/blocks/comms.h
----------------------------------------------------------------------
diff --git a/blocks/comms.h b/blocks/comms.h
new file mode 100644
index 0000000..3f5fda1
--- /dev/null
+++ b/blocks/comms.h
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCKS_COMMS_H_
+#define BLOCKS_COMMS_H_
+
+#include <stdio.h>
+#include "capi/api.h"
+#include "capi/processors.h"
+
+#define SUCCESS 0x00
+#define FINISHED_EARLY 0x01
+#define FAIL 0x02
+
+typedef int transmission_stop(void *);
+
+uint8_t transmit_to_nifi(nifi_instance *instance, flow *flow, 
transmission_stop *stop_callback) {
+
+  flow_file_record *record = 0x00;
+  do {
+    record = get_next_flow_file(instance, flow);
+
+    if (record == 0) {
+      return FINISHED_EARLY;
+    }
+    transmit_flowfile(record, instance);
+
+    free_flowfile(record);
+  } while (record != 0x00 && !( stop_callback != 0x00 && stop_callback(0x00)));
+  return SUCCESS;
+}
+
+#endif /* BLOCKS_COMMS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/blocks/file_blocks.h
----------------------------------------------------------------------
diff --git a/blocks/file_blocks.h b/blocks/file_blocks.h
new file mode 100644
index 0000000..3f1d6f7
--- /dev/null
+++ b/blocks/file_blocks.h
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef BLOCKS_FILE_BLOCKS_H_
+#define BLOCKS_FILE_BLOCKS_H_
+
+#include "capi/api.h"
+#include "capi/processors.h"
+
+#define KEEP_SOURCE 0x01
+#define RECURSE 0x02
+
+/**
+ * Monitor directory can be combined into a current flow. to create an 
execution plan
+ */
+flow *monitor_directory(nifi_instance *instance, char *directory, flow 
*parent_flow, char flags) {
+  GetFileConfig config;
+  config.directory = directory;
+  config.keep_source = flags & KEEP_SOURCE;
+  config.recurse = flags & RECURSE;
+  return create_getfile(instance, parent_flow, &config);
+}
+
+#endif /* BLOCKS_FILE_BLOCKS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/extensions/rocksdb-repos/ProvenanceRepository.h
----------------------------------------------------------------------
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h 
b/extensions/rocksdb-repos/ProvenanceRepository.h
index d325c24..d4ff3a5 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -107,7 +107,6 @@ class ProvenanceRepository : public core::Repository, 
public std::enable_shared_
   }
   // Put
   virtual bool Put(std::string key, const uint8_t *buf, size_t bufLen) {
-
     if (repo_full_) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/c2/C2Agent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index b902c04..aa67035 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -137,7 +137,7 @@ class C2Agent : public state::UpdateController, public 
state::response::Response
    * Handles a C2 event requested by the server.
    * @param resp c2 server response.
    */
-  void handle_c2_server_response(const C2ContentResponse &resp);
+  virtual void handle_c2_server_response(const C2ContentResponse &resp);
 
   /**
    * Handles an update request

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/capi/C2CallbackAgent.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/C2CallbackAgent.h 
b/libminifi/include/capi/C2CallbackAgent.h
new file mode 100644
index 0000000..c7ee5a9
--- /dev/null
+++ b/libminifi/include/capi/C2CallbackAgent.h
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
+#define LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_
+
+#include <utility>
+#include <functional>
+#include <future>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "c2/C2Agent.h"
+#include "core/state/UpdateController.h"
+#include "core/state/Value.h"
+#include "c2/C2Payload.h"
+#include "c2/C2Protocol.h"
+#include "io/validation.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+typedef int c2_ag_update_callback(char *);
+
+typedef int c2_ag_stop_callback(char *);
+
+typedef int c2_ag_start_callback(char *);
+
+class C2CallbackAgent : public c2::C2Agent {
+
+ public:
+
+  explicit C2CallbackAgent(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink, const 
std::shared_ptr<Configure> &configure);
+
+  virtual ~C2CallbackAgent() {
+  }
+
+  void setStopCallback(c2_ag_stop_callback *st){
+    stop = st;
+  }
+
+
+ protected:
+  /**
+     * Handles a C2 event requested by the server.
+     * @param resp c2 server response.
+     */
+    virtual void handle_c2_server_response(const C2ContentResponse &resp);
+
+    c2_ag_stop_callback *stop;
+
+ private:
+    std::shared_ptr<logging::Logger> logger_;
+
+};
+
+} /* namesapce c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+
+#endif /* LIBMINIFI_INCLUDE_C2_C2CALLBACKAGENT_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/capi/Instance.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/Instance.h 
b/libminifi/include/capi/Instance.h
index 5d1c4c1..bbe0f4c 100644
--- a/libminifi/include/capi/Instance.h
+++ b/libminifi/include/capi/Instance.h
@@ -29,6 +29,7 @@
 #include "core/repository/VolatileContentRepository.h"
 #include "core/Repository.h"
 
+#include "C2CallbackAgent.h"
 #include "core/Connectable.h"
 #include "core/ProcessorNode.h"
 #include "core/ProcessContext.h"
@@ -37,6 +38,8 @@
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/FlowConfiguration.h"
 #include "ReflexiveSession.h"
+#include "utils/ThreadPool.h"
+#include "core/state/UpdateController.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -63,10 +66,13 @@ class Instance {
   explicit Instance(const std::string &url, const std::string &port)
       : configure_(std::make_shared<Configure>()),
         url_(url),
+        agent_(nullptr),
         rpgInitialized_(false),
-        
content_repo_(std::make_shared<minifi::core::repository::FileSystemRepository>()),
+        listener_thread_pool_(1),
+        
content_repo_(std::make_shared<minifi::core::repository::VolatileContentRepository>()),
         no_op_repo_(std::make_shared<minifi::core::Repository>()) {
     stream_factory_ = std::make_shared<minifi::io::StreamFactory>(configure_);
+    running_ = false;
     uuid_t uuid;
     uuid_parse(port.c_str(), uuid);
     rpg_ = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, 
url, url, configure_, uuid);
@@ -75,10 +81,28 @@ class Instance {
     content_repo_->initialize(configure_);
   }
 
+  ~Instance() {
+    running_ = false;
+    listener_thread_pool_.shutdown();
+  }
+
   bool isRPGConfigured() {
     return rpgInitialized_;
   }
 
+  void enableAsyncC2(C2_Server *server,c2_stop_callback *c1, c2_start_callback 
*c2, c2_update_callback *c3) {
+    std::shared_ptr<core::controller::ControllerServiceProvider> 
controller_service_provider = nullptr;
+    running_ = true;
+    if (server->type != C2_Server_Type::MQTT){
+      configure_->set("c2.rest.url",server->url);
+      configure_->set("c2.rest.url.ack",server->ack_url);
+    }
+    agent_ = 
std::make_shared<c2::C2CallbackAgent>(controller_service_provider, nullptr, 
configure_);
+    listener_thread_pool_.start();
+    registerUpdateListener(agent_, 1000);
+    agent_->setStopCallback(c1);
+  }
+
   void setRemotePort(std::string remote_port) {
     rpg_->setProperty(minifi::RemoteProcessorGroupPort::portUUID, remote_port);
     rpg_->initialize();
@@ -114,6 +138,26 @@ class Instance {
 
  protected:
 
+  bool registerUpdateListener(const std::shared_ptr<state::UpdateController> 
&updateController, const int64_t &delay) {
+    auto functions = updateController->getFunctions();
+    // run all functions independently
+
+    for (auto function : functions) {
+      std::unique_ptr<utils::AfterExecute<state::Update>> after_execute = 
std::unique_ptr<utils::AfterExecute<state::Update>>(new 
state::UpdateRunner(running_, delay));
+      utils::Worker<state::Update> functor(function, "listeners", 
std::move(after_execute));
+      std::future<state::Update> future;
+      if (!listener_thread_pool_.execute(std::move(functor), future)) {
+        // denote failure
+        return false;
+      }
+    }
+    return true;
+  }
+
+  std::shared_ptr<c2::C2CallbackAgent> agent_;
+
+  std::atomic<bool> running_;
+
   bool rpgInitialized_;
 
   std::shared_ptr<minifi::core::Repository> no_op_repo_;
@@ -125,6 +169,8 @@ class Instance {
   std::shared_ptr<io::StreamFactory> stream_factory_;
   std::string url_;
   std::shared_ptr<Configure> configure_;
+
+  utils::ThreadPool<state::Update> listener_thread_pool_;
 };
 
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/capi/api.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/api.h b/libminifi/include/capi/api.h
index 440329e..d0b06b1 100644
--- a/libminifi/include/capi/api.h
+++ b/libminifi/include/capi/api.h
@@ -18,27 +18,32 @@
 #ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
 #define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
 
+#include <stddef.h>
 #include <stdint.h>
+#include "processors.h"
+
+int initialize_api();
 
 #ifdef __cplusplus
 extern "C" {
 #endif
 
 #define API_VERSION "0.01"
+
+void enable_logging();
+
 /****
  * ##################################################################
  *  BASE NIFI OPERATIONS
  * ##################################################################
  */
 
-
 /**
  * NiFi Port struct
  */
 typedef struct {
-    char *pord_id;
-}nifi_port;
-
+  char *port_id;
+} nifi_port;
 
 /**
  * Nifi instance struct
@@ -51,7 +56,6 @@ typedef struct {
 
 } nifi_instance;
 
-
 nifi_instance *create_instance(char *url, nifi_port *port);
 
 void set_property(nifi_instance *, char *, char *);
@@ -60,6 +64,32 @@ void initialize_instance(nifi_instance *);
 
 void free_instance(nifi_instance*);
 
+/****
+ * ##################################################################
+ *  C2 OPERATIONS
+ * ##################################################################
+ */
+
+enum C2_Server_Type{
+  REST,
+  MQTT
+};
+
+typedef struct {
+  char *url;
+  char *ack_url;
+  char *identifier;
+  char *topic;
+  enum C2_Server_Type type;
+} C2_Server;
+
+typedef int c2_update_callback(char *);
+
+typedef int c2_stop_callback(char *);
+
+typedef int c2_start_callback(char *);
+
+void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, 
c2_start_callback *, c2_update_callback *);
 
 /****
  * ##################################################################
@@ -73,14 +103,13 @@ typedef struct {
 
 uint8_t run_processor(const processor *processor);
 
-
 /****
  * ##################################################################
  *  FLOWFILE OPERATIONS
  * ##################################################################
  */
 
-typedef struct{
+typedef struct {
   char *key;
   void *value;
   size_t value_size;
@@ -93,34 +122,35 @@ typedef struct{
 typedef struct {
   uint64_t size; /**< Size in bytes of the data corresponding to this flow 
file */
 
+  void * in;
+
   char * contentLocation; /**< Filesystem location of this object */
 
   void *attributes; /**< Hash map of attributes */
 
-
 } flow_file_record;
 
-
-
-typedef struct  {
-    void *plan;
+typedef struct {
+  void *plan;
 } flow;
 
-
 flow *create_flow(nifi_instance *, const char *);
 
+flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c);
+
 void free_flow(flow *);
 
 flow_file_record *get_next_flow_file(nifi_instance *, flow *);
 
-size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t );
-
+size_t get_flow_files(nifi_instance *, flow *, flow_file_record **, size_t);
 
 /**
  * Creates a flow file object.
  * Will obtain the size of file
  */
-flow_file_record* create_flowfile(const char *file);
+flow_file_record* create_flowfile(const char *file, const size_t len);
+
+flow_file_record* create_ff_object(const char *file, const size_t len, const 
uint64_t size);
 
 void free_flowfile(flow_file_record*);
 
@@ -138,7 +168,6 @@ uint8_t remove_attribute(flow_file_record*, char *key);
 
 void transmit_flowfile(flow_file_record *, nifi_instance *);
 
-
 /****
  * ##################################################################
  *  Persistence Operations

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/capi/processors.h
----------------------------------------------------------------------
diff --git a/libminifi/include/capi/processors.h 
b/libminifi/include/capi/processors.h
new file mode 100644
index 0000000..0d395e5
--- /dev/null
+++ b/libminifi/include/capi/processors.h
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
+#define LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+  char *directory;
+  unsigned keep_source :1;
+  unsigned recurse :1;
+} GetFileConfig;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* LIBMINIFI_INCLUDE_CAPI_PROCESSORS_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/core/logging/Logger.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/logging/Logger.h 
b/libminifi/include/core/logging/Logger.h
index a81e604..37ef390 100644
--- a/libminifi/include/core/logging/Logger.h
+++ b/libminifi/include/core/logging/Logger.h
@@ -21,6 +21,7 @@
 #include <mutex>
 #include <memory>
 #include <sstream>
+#include <iostream>
 
 #include "spdlog/spdlog.h"
 
@@ -33,6 +34,24 @@ namespace logging {
 
 #define LOG_BUFFER_SIZE 1024
 
+class LoggerControl {
+ public:
+  LoggerControl()
+      : is_enabled_(true) {
+
+  }
+
+  bool is_enabled(){
+    return is_enabled_;
+  }
+
+  void setEnabled(bool status){
+    is_enabled_ = status;
+  }
+ protected:
+  std::atomic<bool> is_enabled_;
+};
+
 template<typename ... Args>
 inline std::string format_string(char const* format_str, Args&&... args) {
   char buf[LOG_BUFFER_SIZE];
@@ -171,6 +190,8 @@ class Logger : public BaseLogger {
   }
 
   bool should_log(const LOG_LEVEL &level) {
+    if (controller_ && !controller_->is_enabled())
+      return false;
     spdlog::level::level_enum logger_level = spdlog::level::level_enum::info;
     switch (level) {
       case critical:
@@ -228,16 +249,24 @@ class Logger : public BaseLogger {
         break;
     }
   }
-  Logger(std::shared_ptr<spdlog::logger> delegate)
-      : delegate_(delegate) {
+  Logger(std::shared_ptr<spdlog::logger> delegate, 
std::shared_ptr<LoggerControl> controller)
+      : delegate_(delegate), controller_(controller) {
   }
 
+  Logger(std::shared_ptr<spdlog::logger> delegate)
+        : delegate_(delegate), controller_(nullptr) {
+    }
+
+
   std::shared_ptr<spdlog::logger> delegate_;
+  std::shared_ptr<LoggerControl> controller_;
 
   std::mutex mutex_;
  private:
   template<typename ... Args>
   inline void log(spdlog::level::level_enum level, const char * const format, 
const Args& ... args) {
+    if (controller_ && !controller_->is_enabled())
+         return;
     std::lock_guard<std::mutex> lock(mutex_);
     if (!delegate_->should_log(level)) {
       return;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/include/core/logging/LoggerConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/logging/LoggerConfiguration.h 
b/libminifi/include/core/logging/LoggerConfiguration.h
index fbcdaad..0236410 100644
--- a/libminifi/include/core/logging/LoggerConfiguration.h
+++ b/libminifi/include/core/logging/LoggerConfiguration.h
@@ -91,6 +91,13 @@ class LoggerConfiguration {
     return logger_configuration;
   }
 
+  void disableLogging(){
+    controller_->setEnabled(false);
+  }
+
+  void enableLogging(){
+      controller_->setEnabled(true);
+    }
   /**
    * (Re)initializes the logging configuation with the given logger properties.
    */
@@ -110,8 +117,8 @@ class LoggerConfiguration {
 
   class LoggerImpl : public Logger {
    public:
-    LoggerImpl(std::string name, std::shared_ptr<spdlog::logger> delegate)
-        : Logger(delegate),
+    LoggerImpl(std::string name, std::shared_ptr<LoggerControl> controller, 
std::shared_ptr<spdlog::logger> delegate)
+        : Logger(delegate,controller),
           name(name) {
     }
     void set_delegate(std::shared_ptr<spdlog::logger> delegate) {
@@ -119,6 +126,7 @@ class LoggerConfiguration {
       delegate_ = delegate;
     }
     const std::string name;
+
   };
 
   LoggerConfiguration();
@@ -127,6 +135,7 @@ class LoggerConfiguration {
   std::shared_ptr<spdlog::formatter> formatter_;
   std::mutex mutex;
   std::shared_ptr<LoggerImpl> logger_ = nullptr;
+  std::shared_ptr<LoggerControl> controller_;
 };
 
 template<typename T>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/capi/C2CallbackAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/C2CallbackAgent.cpp 
b/libminifi/src/capi/C2CallbackAgent.cpp
new file mode 100644
index 0000000..d8da5d6
--- /dev/null
+++ b/libminifi/src/capi/C2CallbackAgent.cpp
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "capi/C2CallbackAgent.h"
+#include <unistd.h>
+#include <csignal>
+#include <utility>
+#include <vector>
+#include <map>
+#include <string>
+#include <memory>
+#include "c2/ControllerSocketProtocol.h"
+#include "core/state/UpdateController.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/file/FileUtils.h"
+#include "utils/file/FileManager.h"
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace c2 {
+
+C2CallbackAgent::C2CallbackAgent(const 
std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const 
std::shared_ptr<state::StateMonitor> &updateSink,
+                                 const std::shared_ptr<Configure> 
&configuration)
+    : C2Agent(controller, updateSink, configuration),
+      stop(nullptr),
+      logger_(logging::LoggerFactory<C2CallbackAgent>::getLogger()) {
+}
+
+void C2CallbackAgent::handle_c2_server_response(const C2ContentResponse &resp) 
{
+  switch (resp.op) {
+    case Operation::CLEAR:
+      break;
+    case Operation::UPDATE:
+      break;
+    case Operation::DESCRIBE:
+      break;
+    case Operation::RESTART:
+      break;
+    case Operation::START:
+    case Operation::STOP: {
+      if (resp.name == "C2" || resp.name == "c2") {
+        raise(SIGTERM);
+      }
+
+      auto str = resp.name.c_str();
+
+      if (nullptr != stop)
+      stop(const_cast<char*>(str));
+
+      break;
+    }
+      //
+      break;
+    default:
+      break;
+      // do nothing
+  }
+}
+
+} /* namespace c2 */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/capi/Plan.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp
index 9775c80..6e5c7f6 100644
--- a/libminifi/src/capi/Plan.cpp
+++ b/libminifi/src/capi/Plan.cpp
@@ -122,10 +122,11 @@ bool ExecutionPlan::setProperty(const 
std::shared_ptr<core::Processor> proc, con
     }
   }
 
-  if (i >= processor_queue_.size() || i == 0 || i >= 
processor_contexts_.size()) {
+  if (i >= processor_queue_.size() || i >= processor_contexts_.size()) {
     return false;
   }
 
+
   return processor_contexts_.at(i)->setProperty(prop, value);
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/capi/api.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp
index 5e8f3d8..cb0d726 100644
--- a/libminifi/src/capi/api.cpp
+++ b/libminifi/src/capi/api.cpp
@@ -25,6 +25,23 @@
 #include "capi/Instance.h"
 #include "capi/Plan.h"
 #include "ResourceClaim.h"
+#include "processors/GetFile.h"
+#include "core/logging/LoggerConfiguration.h"
+
+class API_INITIALIZER {
+  static int initialized;
+};
+
+int API_INITIALIZER::initialized = initialize_api();
+
+int initialize_api() {
+  logging::LoggerConfiguration::getConfiguration().disableLogging();
+  return 1;
+}
+
+void enable_logging() {
+  logging::LoggerConfiguration::getConfiguration().enableLogging();
+}
 
 class DirectoryConfiguration {
  protected:
@@ -61,6 +78,18 @@ void initialize_instance(nifi_instance *instance) {
   auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
   minifi_instance_ref->setRemotePort(instance->port.pord_id);
 }
+/*
+ typedef int c2_update_callback(char *);
+
+ typedef int c2_stop_callback(char *);
+
+ typedef int c2_start_callback(char *);
+
+ */
+void enable_async_c2(nifi_instance *instance, C2_Server *server, 
c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
+  auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
+  minifi_instance_ref->enableAsyncC2(server, c1, c2, c3);
+}
 
 /**
  * Sets a property within the nifi instance
@@ -88,24 +117,44 @@ void free_instance(nifi_instance* instance) {
  * Creates a flow file record
  * @param file file to place into the flow file.
  */
-flow_file_record* create_flowfile(const char *file) {
+flow_file_record* create_flowfile(const char *file, const size_t len) {
   flow_file_record *new_ff = new flow_file_record;
   new_ff->attributes = new std::map<std::string, std::string>();
-  new_ff->contentLocation = new char[strlen(file)];
-  snprintf(new_ff->contentLocation, strlen(file), "%s", file);
+  new_ff->contentLocation = new char[len + 1];
+  snprintf(new_ff->contentLocation, len + 1, "%s", file);
   std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
   // set the size of the flow file.
   new_ff->size = in.tellg();
-
   return new_ff;
 }
 
 /**
+ * Creates a flow file record
+ * @param file file to place into the flow file.
+ */
+flow_file_record* create_ff_object(const char *file, const size_t len, const 
uint64_t size) {
+  flow_file_record *new_ff = new flow_file_record;
+  new_ff->attributes = new std::map<std::string, std::string>();
+  new_ff->contentLocation = new char[len + 1];
+  snprintf(new_ff->contentLocation, len + 1, "%s", file);
+  std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
+  // set the size of the flow file.
+  new_ff->size = size;
+  return new_ff;
+}
+/**
  * Reclaims memory associated with a flow file object
  * @param ff flow file record.
  */
 void free_flowfile(flow_file_record *ff) {
   if (ff != nullptr) {
+    if (ff->in != nullptr) {
+      auto instance = static_cast<nifi_instance*>(ff->in);
+      auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
+      auto content_repo = minifi_instance_ref->getContentRepository();
+      std::shared_ptr<minifi::ResourceClaim> claim = 
std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
+      content_repo->remove(claim);
+    }
     auto map = static_cast<std::map<std::string, 
std::string>*>(ff->attributes);
     delete[] ff->contentLocation;
     delete map;
@@ -207,6 +256,28 @@ flow *create_flow(nifi_instance *instance, const char 
*first_processor) {
   return new_flow;
 }
 
+flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig 
*c) {
+  std::string first_processor = "GetFile";
+  auto minifi_instance_ref = 
static_cast<minifi::Instance*>(instance->instance_ptr);
+  flow *new_flow = parent_flow == 0x00 ? new flow : parent_flow;
+
+  if (parent_flow == 0x00) {
+    auto execution_plan = new 
ExecutionPlan(minifi_instance_ref->getContentRepository(), 
minifi_instance_ref->getNoOpRepository(), 
minifi_instance_ref->getNoOpRepository());
+
+    new_flow->plan = execution_plan;
+  }
+
+  ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan);
+  // automatically adds it with success
+  auto getFile = plan->addProcessor(first_processor, first_processor);
+
+  plan->setProperty(getFile, processors::GetFile::Directory.getName(), 
c->directory);
+  plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), 
c->keep_source ? "true" : "false");
+  plan->setProperty(getFile, processors::GetFile::Recurse.getName(), 
c->recurse ? "true" : "false");
+
+  return new_flow;
+}
+
 void free_flow(flow *flow) {
   if (flow == nullptr)
     return;
@@ -230,7 +301,8 @@ flow_file_record *get_next_flow_file(nifi_instance 
*instance, flow *flow) {
     // create a flow file.
     claim->increaseFlowFileRecordOwnedCount();
     auto path = claim->getContentFullPath();
-    auto ffr = create_flowfile(path.c_str());
+    auto ffr = create_ff_object(path.c_str(), path.length(), ff->getSize());
+    ffr->in = instance;
     return ffr;
   } else {
     return nullptr;
@@ -254,7 +326,8 @@ size_t get_flow_files(nifi_instance *instance, flow *flow, 
flow_file_record **ff
 
       auto path = claim->getContentFullPath();
       // create a flow file.
-      ff_r[i] = create_flowfile(path.c_str());
+      ff_r[i] = create_ff_object(path.c_str(), path.length(), ff->getSize());
+      ff_r[i]->in = instance;
     } else {
       break;
     }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/core/FlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index c9d0d83..3427559 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -51,7 +51,6 @@ FlowFile::FlowFile()
 }
 
 FlowFile::~FlowFile() {
-  logger_->log_debug("Deleteting %s", getUUIDStr());
 }
 
 FlowFile& FlowFile::operator=(const FlowFile& other) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/core/logging/LoggerConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp 
b/libminifi/src/core/logging/LoggerConfiguration.cpp
index dac441b..a0a207a 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -59,7 +59,9 @@ LoggerConfiguration::LoggerConfiguration()
     : root_namespace_(create_default_root()),
       loggers(std::vector<std::shared_ptr<LoggerImpl>>()),
       
formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) 
{
-  logger_ = std::shared_ptr<LoggerImpl>(new 
LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, 
root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_)));
+  controller_ = std::make_shared<LoggerControl>();
+  logger_ = std::shared_ptr<LoggerImpl>(
+      new LoggerImpl(core::getClassName<LoggerConfiguration>(), controller_, 
get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), 
formatter_)));
   loggers.push_back(logger_);
 }
 
@@ -88,7 +90,7 @@ void LoggerConfiguration::initialize(const 
std::shared_ptr<LoggerProperties> &lo
 
 std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string 
&name) {
   std::lock_guard<std::mutex> lock(mutex);
-  std::shared_ptr<LoggerImpl> result = std::make_shared<LoggerImpl>(name, 
get_logger(logger_, root_namespace_, name, formatter_));
+  std::shared_ptr<LoggerImpl> result = std::make_shared<LoggerImpl>(name, 
controller_, get_logger(logger_, root_namespace_, name, formatter_));
   loggers.push_back(result);
   return result;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp 
b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 60f538d..0e99311 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -83,16 +83,16 @@ void VolatileContentRepository::start() {
   thread_ = std::thread(&VolatileContentRepository::run, 
shared_from_parent<VolatileContentRepository>());
   thread_.detach();
   running_ = true;
-  logger_->log_debug("%s Repository Monitor Thread Start", getName());
+  logger_->log_info("%s Repository Monitor Thread Start", getName());
 }
 
 std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const 
std::shared_ptr<minifi::ResourceClaim> &claim) {
-  logger_->log_debug("enter write for %s", claim->getContentFullPath());
+  logger_->log_info("enter write for %s", claim->getContentFullPath());
   {
     std::lock_guard<std::mutex> lock(map_mutex_);
     auto claim_check = master_list_.find(claim->getContentFullPath());
     if (claim_check != master_list_.end()) {
-      logger_->log_debug("Creating copy of atomic entry");
+      logger_->log_info("Creating copy of atomic entry");
       auto ent = claim_check->second->takeOwnership();
       if (ent == nullptr) {
         return nullptr;
@@ -107,7 +107,7 @@ std::shared_ptr<io::BaseStream> 
VolatileContentRepository::write(const std::shar
       if (ent->testAndSetKey(claim, nullptr, nullptr, 
resource_claim_comparator_)) {
         std::lock_guard<std::mutex> lock(map_mutex_);
         master_list_[claim->getContentFullPath()] = ent;
-        logger_->log_debug("Minimize locking, return stream for %s", 
claim->getContentFullPath());
+        logger_->log_info("Minimize locking, return stream for %s", 
claim->getContentFullPath());
         return 
std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim,
 ent);
       }
       size++;
@@ -125,7 +125,7 @@ std::shared_ptr<io::BaseStream> 
VolatileContentRepository::write(const std::shar
       }
     }
   }
-  logger_->log_debug("Cannot write %s %d, returning nullptr to roll back 
session. Repo is either full or locked", claim->getContentFullPath(), size);
+  logger_->log_info("Cannot write %s %d, returning nullptr to roll back 
session. Repo is either full or locked", claim->getContentFullPath(), size);
   return nullptr;
 }
 
@@ -166,14 +166,16 @@ bool VolatileContentRepository::remove(const 
std::shared_ptr<minifi::ResourceCla
       // if we cannot remove the entry we will let the owner's destructor
       // decrement the reference count and free it
       master_list_.erase(claim->getContentFullPath());
+      // because of the test and set we need to decrement ownership
+      ptr->decrementOwnership();
       if (ptr->freeValue(claim)) {
-        logger_->log_debug("Removed %s", claim->getContentFullPath());
+        logger_->log_info("Removed %s", claim->getContentFullPath());
         return true;
       } else {
-        logger_->log_debug("free failed for %s", claim->getContentFullPath());
+        logger_->log_info("free failed for %s", claim->getContentFullPath());
       }
     } else {
-      logger_->log_debug("Could not remove %s", claim->getContentFullPath());
+      logger_->log_info("Could not remove %s", claim->getContentFullPath());
     }
   } else {
     std::lock_guard<std::mutex> lock(map_mutex_);
@@ -187,7 +189,7 @@ bool VolatileContentRepository::remove(const 
std::shared_ptr<minifi::ResourceCla
     return true;
   }
 
-  logger_->log_debug("Could not remove %s, may not exist", 
claim->getContentFullPath());
+  logger_->log_info("Could not remove %s, may not exist", 
claim->getContentFullPath());
   return false;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2821d71d/libminifi/src/utils/Id.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/utils/Id.cpp b/libminifi/src/utils/Id.cpp
index 05e55a2..da0fea9 100644
--- a/libminifi/src/utils/Id.cpp
+++ b/libminifi/src/utils/Id.cpp
@@ -38,7 +38,7 @@ namespace utils {
 uint64_t timestamp = 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
 
 NonRepeatingStringGenerator::NonRepeatingStringGenerator()
-    : prefix_((std::to_string(timestamp) + "-")) {
+    : prefix_((std::to_string(timestamp) + "-")), incrementor_(0) {
 }
 
 IdGenerator::IdGenerator()

Reply via email to