Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 45acefaac -> 02f18238e


MINIFICPP-403: Update connectables so that they contain a reference to the flow
identifier. With this approach the flow identifier will be updated with C2
and automatically apply to any processors applied as a result of that update

This closes #331.

closes #313
closes #295

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/02f18238
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/02f18238
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/02f18238

Branch: refs/heads/master
Commit: 02f18238e8f579e2597cf21154fc693afb6d4fa0
Parents: 45acefa
Author: Marc Parisi <[email protected]>
Authored: Tue May 15 13:45:02 2018 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Fri May 18 16:15:34 2018 -0400

----------------------------------------------------------------------
 extensions/http-curl/tests/C2UpdateTest.cpp     |  2 +-
 .../http-curl/tests/HttpGetIntegrationTest.cpp  |  1 +
 libminifi/CMakeLists.txt                        |  2 +-
 libminifi/include/FlowController.h              |  3 +-
 libminifi/include/FlowFileRecord.h              |  6 +-
 libminifi/include/core/Connectable.h            | 19 +++-
 libminifi/include/core/FlowConfiguration.h      | 14 ++-
 libminifi/include/core/ProcessorNode.h          | 14 ++-
 libminifi/include/core/state/FlowIdentifier.h   | 91 ++++++++++++++++++++
 .../include/core/state/nodes/FlowInformation.h  | 67 +++++++-------
 libminifi/src/FlowController.cpp                | 35 +-------
 libminifi/src/FlowFileRecord.cpp                | 18 +++-
 libminifi/src/core/Connectable.cpp              |  2 +
 libminifi/src/core/FlowConfiguration.cpp        | 29 +++++++
 libminifi/src/core/ProcessSession.cpp           | 55 ++++++++----
 libminifi/src/core/yaml/YamlConfiguration.cpp   |  2 +
 libminifi/test/TestBase.cpp                     |  6 +-
 libminifi/test/TestBase.h                       | 20 ++---
 libminifi/test/unit/GetFileTests.cpp            |  1 +
 19 files changed, 285 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/extensions/http-curl/tests/C2UpdateTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp 
b/extensions/http-curl/tests/C2UpdateTest.cpp
index 0799ae5..52e60f8 100644
--- a/extensions/http-curl/tests/C2UpdateTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateTest.cpp
@@ -174,7 +174,7 @@ int main(int argc, char **argv) {
 
   auto milliseconds = 
std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
   std::string logs = LogTestController::getInstance().log_output.str();
-  assert(logs.find("Starting to reload Flow Controller with flow control name 
MiNiFi Flow, version 0") != std::string::npos);
+  assert(logs.find("Starting to reload Flow Controller with flow control name 
MiNiFi Flow, version") != std::string::npos);
   LogTestController::getInstance().reset();
   rmdir("./content_repository");
   assert(h_ex.calls_ <= (milliseconds / 1000) + 1);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
----------------------------------------------------------------------
diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp 
b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
index df40497..9e6e99f 100644
--- a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
@@ -154,6 +154,7 @@ int main(int argc, char **argv) {
   assert(logs.find("key:filename value:") != std::string::npos);
   assert(logs.find("key:invokehttp.request.url value:" + url) != 
std::string::npos);
   assert(logs.find("key:invokehttp.status.code value:200") != 
std::string::npos);
+  assert(logs.find("key:flow.id") != std::string::npos);
 
   LogTestController::getInstance().reset();
   rmdir("./content_repository");

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 3e6390f..302b7f8 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -59,7 +59,7 @@ include_directories(../thirdparty/rapidjson-1.1.0/include)
 include_directories(../thirdparty/concurrentqueue/)
 include_directories(include)
 
-file(GLOB SOURCES  "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  
"src/core/state/*.cpp" "src/c2/protocols/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" 
"src/io/tls/*.cpp" "src/core/controller/*.cpp" "src/controllers/*.cpp" 
"src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" 
"src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/utils/*.cpp" 
"src/*.cpp")
+file(GLOB SOURCES  "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  
"src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" 
"src/c2/*.cpp" "src/io/*.cpp" "src/io/tls/*.cpp" "src/core/controller/*.cpp" 
"src/controllers/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" 
"src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  "src/provenance/*.cpp" 
"src/utils/*.cpp" "src/*.cpp")
 
 file(GLOB PROCESSOR_SOURCES  "src/processors/*.cpp" )
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/include/FlowController.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index 71d22cb..0466546 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -172,7 +172,7 @@ class FlowController : public 
core::controller::ControllerServiceProvider, publi
   // first it will validate the payload with the current root node config for 
flowController
   // like FlowController id/name is the same and new version is greater than 
the current version
   // after that, it will apply the configuration
-  bool applyConfiguration(const std::string &configurePayload);
+  bool applyConfiguration(const std::string &source, const std::string 
&configurePayload);
 
   // get name
   std::string getName() const{
@@ -408,7 +408,6 @@ class FlowController : public 
core::controller::ControllerServiceProvider, publi
   std::chrono::steady_clock::time_point last_metrics_capture_;
 
 private:
-  std::shared_ptr<state::response::FlowVersion> flow_version_;
   std::shared_ptr<logging::Logger> logger_;
   std::string serial_number_;
   static std::shared_ptr<utils::IdGenerator> id_generator_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/include/FlowFileRecord.h
----------------------------------------------------------------------
diff --git a/libminifi/include/FlowFileRecord.h 
b/libminifi/include/FlowFileRecord.h
index 93ea74b..795003f 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -64,11 +64,13 @@ enum FlowAttribute {
   DISCARD_REASON,
   // Indicates an identifier other than the FlowFile's UUID that is known to 
refer to this FlowFile.
   ALTERNATE_IDENTIFIER,
+  // Flow identifier
+  FLOW_ID,
   MAX_FLOW_ATTRIBUTES
 };
 
 // FlowFile Attribute Key
-static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = { "path", 
"absolute.path", "filename", "uuid", "priority", "mime.type", "discard.reason", 
"alternate.identifier" };
+static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] = { "path", 
"absolute.path", "filename", "uuid", "priority", "mime.type", "discard.reason", 
"alternate.identifier", "flow.id" };
 
 // FlowFile Attribute Enum to Key
 inline const char *FlowAttributeKey(FlowAttribute attribute) {
@@ -122,7 +124,7 @@ class FlowFileRecord : public core::FlowFile, public 
io::Serializable {
   // Destructor
   virtual ~FlowFileRecord();
   // addAttribute key is enum
-  bool addKeyedAttribute(FlowAttribute key, std::string value);
+  bool addKeyedAttribute(FlowAttribute key, const std::string &value);
   // removeAttribute key is enum
   bool removeKeyedAttribute(FlowAttribute key);
   // updateAttribute key is enum

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/include/core/Connectable.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/Connectable.h 
b/libminifi/include/core/Connectable.h
index 588b67a..ec2fd3a 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -25,7 +25,7 @@
 #include "core/logging/Logger.h"
 #include "Relationship.h"
 #include "Scheduling.h"
-
+#include "core/state/FlowIdentifier.h"
 namespace org {
 namespace apache {
 namespace nifi {
@@ -133,6 +133,21 @@ class __attribute__((visibility("default"))) Connectable : 
public CoreComponent
     return false;
   }
 
+  /**
+   * Sets the flow version for this connectable.
+   */
+  void setFlowIdentifier(const std::shared_ptr<state::FlowIdentifier> 
&version){
+    connectable_version_ = version;
+  }
+
+  /**
+   * Returns theflow version
+   * @returns flow version. can be null if a flow version is not tracked.
+   */
+  virtual std::shared_ptr<state::FlowIdentifier> getFlowIdentifier(){
+    return connectable_version_;
+  }
+
  protected:
 
   // Penalization Period in MilliSecond
@@ -165,6 +180,8 @@ class __attribute__((visibility("default"))) Connectable : 
public CoreComponent
   std::atomic<SchedulingStrategy> strategy_;
   // Concurrent condition variable for whether there is incoming work to do
   std::condition_variable work_condition_;
+  // version under which this connectable was created.
+  std::shared_ptr<state::FlowIdentifier> connectable_version_;
 
 private:
   std::shared_ptr<logging::Logger> logger_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/include/core/FlowConfiguration.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/FlowConfiguration.h 
b/libminifi/include/core/FlowConfiguration.h
index eab7169..cf4c7f6 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -33,6 +33,7 @@
 #include "core/ProcessSession.h"
 #include "core/ProcessGroup.h"
 #include "io/StreamFactory.h"
+#include "core/state/nodes/FlowInformation.h"
 
 namespace org {
 namespace apache {
@@ -61,6 +62,7 @@ class FlowConfiguration : public CoreComponent {
         logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
     controller_services_ = 
std::make_shared<core::controller::ControllerServiceMap>();
     service_provider_ = 
std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_,
 nullptr, configuration);
+    flow_version_ = std::make_shared<state::response::FlowVersion>("", 
"default", "");
     // it is okay if this has already been called
     initialize_static_functions();
   }
@@ -82,6 +84,14 @@ class FlowConfiguration : public CoreComponent {
   // Create Provenance Report Task
   std::shared_ptr<core::Processor> createProvenanceReportTask(void);
 
+  std::shared_ptr<state::response::FlowVersion> getFlowVersion() const{
+    return flow_version_;
+  }
+
+  std::shared_ptr<Configure> getConfiguration() { // cannot be const as 
getters mutate the underlying map
+    return configuration_;
+  }
+
   /**
    * Returns the configuration path string
    * @return config_path_
@@ -94,6 +104,8 @@ class FlowConfiguration : public CoreComponent {
     return getRoot(config_path_);
   }
 
+  std::unique_ptr<core::ProcessGroup> updateFromPayload(const std::string 
&source, const std::string &yamlConfigPayload);
+
   virtual std::unique_ptr<core::ProcessGroup> getRootFromPayload(const 
std::string &yamlConfigPayload) {
     return nullptr;
   }
@@ -147,7 +159,7 @@ class FlowConfiguration : public CoreComponent {
   // stream factory
   std::shared_ptr<io::StreamFactory> stream_factory_;
   std::shared_ptr<Configure> configuration_;
-
+  std::shared_ptr<state::response::FlowVersion> flow_version_;
  private:
   std::shared_ptr<logging::Logger> logger_;
   static std::mutex atomic_initialization_;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/include/core/ProcessorNode.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/ProcessorNode.h 
b/libminifi/include/core/ProcessorNode.h
index ed44d6a..c64dbb0 100644
--- a/libminifi/include/core/ProcessorNode.h
+++ b/libminifi/include/core/ProcessorNode.h
@@ -30,8 +30,6 @@ namespace core {
 
 /**
  * Processor node functions as a pass through to the implementing Connectables
- * ProcessorNode can be used by itself or with a pass through object, in which 
case
- * we need to function as a passthrough or not.
  */
 class ProcessorNode : public ConfigurableComponent, public Connectable {
  public:
@@ -101,6 +99,18 @@ class ProcessorNode : public ConfigurableComponent, public 
Connectable {
   }
 
   /**
+   * Returns theflow version
+   * @returns flow version. can be null if a flow version is not tracked.
+   */
+  virtual std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() {
+    if (processor_ != nullptr) {
+      return processor_->getFlowIdentifier();
+    } else {
+      return connectable_version_;
+    }
+  }
+
+  /**
    * Sets the dynamic property using the provided name
    * @param property name
    * @param value property value.

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/include/core/state/FlowIdentifier.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/FlowIdentifier.h 
b/libminifi/include/core/state/FlowIdentifier.h
new file mode 100644
index 0000000..72dc13f
--- /dev/null
+++ b/libminifi/include/core/state/FlowIdentifier.h
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CORE_STATE_FLOWIDENTIFIER_H_
+#define LIBMINIFI_INCLUDE_CORE_STATE_FLOWIDENTIFIER_H_
+
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace state {
+
+/**
+ * Purpose: Represents a flow identifier for a given flow update or instance.
+ *
+ * Design: Immutable collection of strings for the component parts.
+ */
+class FlowIdentifier {
+ public:
+
+  FlowIdentifier() = delete;
+
+  /**
+   * Constructor accepts the url, bucket id, and flow id.
+   */
+  explicit FlowIdentifier(const std::string &url, const std::string 
&bucket_id, const std::string &flow_id) {
+    registry_url_ = url;
+    bucket_id_ = bucket_id;
+    flow_id_ = flow_id;
+  }
+
+  /**
+   * In most cases the lock guard isn't necessary for these getters; however,
+   * we don't want to cause issues if the FlowVersion object is ever used in a 
way
+   * that breaks the current paradigm.
+   */
+  std::string getRegistryUrl() const {
+    return registry_url_;
+  }
+
+  std::string getBucketId() const {
+    return bucket_id_;
+  }
+
+  std::string getFlowId() const {
+    return flow_id_;
+  }
+ protected:
+
+  explicit FlowIdentifier(const FlowIdentifier &other) {
+    registry_url_ = other.registry_url_;
+    bucket_id_ = other.bucket_id_;
+    flow_id_ = other.flow_id_;
+  }
+  FlowIdentifier &operator=(const FlowIdentifier &other) {
+    registry_url_ = other.registry_url_;
+    bucket_id_ = other.bucket_id_;
+    flow_id_ = other.flow_id_;
+    return *this;
+  }
+
+ private:
+  std::string registry_url_;
+  std::string bucket_id_;
+  std::string flow_id_;
+  friend class FlowVersion;
+};
+
+
+} /* namespace state */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_STATE_FLOWIDENTIFIER_H_ */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/include/core/state/nodes/FlowInformation.h
----------------------------------------------------------------------
diff --git a/libminifi/include/core/state/nodes/FlowInformation.h 
b/libminifi/include/core/state/nodes/FlowInformation.h
index 9c9874a..022b9ce 100644
--- a/libminifi/include/core/state/nodes/FlowInformation.h
+++ b/libminifi/include/core/state/nodes/FlowInformation.h
@@ -45,6 +45,7 @@
 #include "Connection.h"
 #include "io/ClientSocket.h"
 #include "../nodes/StateMonitor.h"
+#include "../FlowIdentifier.h"
 
 namespace org {
 namespace apache {
@@ -58,61 +59,66 @@ class FlowVersion : public DeviceInformation {
 
   explicit FlowVersion()
       : DeviceInformation("FlowVersion", nullptr) {
+    setFlowVersion("", "", getUUIDStr());
   }
 
   explicit FlowVersion(const std::string &registry_url, const std::string 
&bucket_id, const std::string &flow_id)
-      : DeviceInformation("FlowVersion", nullptr),
-        registry_url_(registry_url),
-        bucket_id_(bucket_id),
-        flow_id_(flow_id) {
-    setFlowVersion(registry_url_, bucket_id_, flow_id_);
+      : DeviceInformation("FlowVersion", nullptr) {
+    setFlowVersion(registry_url, bucket_id, flow_id.empty() ? getUUIDStr() : 
flow_id);
   }
 
   explicit FlowVersion(FlowVersion &&fv)
       : DeviceInformation("FlowVersion", nullptr),
-        registry_url_(std::move(fv.registry_url_)),
-        bucket_id_(std::move(fv.bucket_id_)),
-        flow_id_(std::move(fv.flow_id_)) {
-    setFlowVersion(registry_url_, bucket_id_, flow_id_);
+        identifier(std::move(fv.identifier)) {
   }
 
   std::string getName() const {
     return "FlowVersion";
   }
 
-  std::string getRegistryUrl() const {
-    return registry_url_;
+  virtual std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() {
+    std::lock_guard<std::mutex> lock(guard);
+    return identifier;
+  }
+  /**
+   * In most cases the lock guard isn't necessary for these getters; however,
+   * we don't want to cause issues if the FlowVersion object is ever used in a 
way
+   * that breaks the current paradigm.
+   */
+  std::string getRegistryUrl() {
+    std::lock_guard<std::mutex> lock(guard);
+    return identifier->getRegistryUrl();
   }
 
-  std::string getBucketId() const {
-    return bucket_id_;
+  std::string getBucketId() {
+    std::lock_guard<std::mutex> lock(guard);
+    return identifier->getBucketId();
   }
 
-  std::string getFlowId() const {
-    return flow_id_.empty() ? getUUIDStr() : flow_id_;
+  std::string getFlowId() {
+    std::lock_guard<std::mutex> lock(guard);
+    return identifier->getFlowId();
   }
 
   void setFlowVersion(const std::string &url, const std::string &bucket_id, 
const std::string &flow_id) {
-    registry_url_ = url;
-    bucket_id_ = bucket_id;
-    flow_id_ = flow_id;
+    std::lock_guard<std::mutex> lock(guard);
+    identifier = std::make_shared<FlowIdentifier>(url, bucket_id, flow_id);
   }
 
   std::vector<SerializedResponseNode> serialize() {
-    std::lock_guard<std::mutex> lock_guard(guard);
-
+    std::lock_guard<std::mutex> lock(guard);
     std::vector<SerializedResponseNode> serialized;
     SerializedResponseNode ru;
     ru.name = "registryUrl";
-    ru.value = registry_url_;
+    ru.value = identifier->getRegistryUrl();
 
     SerializedResponseNode bucketid;
     bucketid.name = "bucketId";
-    bucketid.value = bucket_id_;
+    bucketid.value = identifier->getBucketId();
 
     SerializedResponseNode flowId;
     flowId.name = "flowId";
-    flowId.value = getFlowId();
+    flowId.value = identifier->getFlowId();
 
     serialized.push_back(ru);
     serialized.push_back(bucketid);
@@ -121,19 +127,18 @@ class FlowVersion : public DeviceInformation {
   }
 
   FlowVersion &operator=(const FlowVersion &&fv) {
-    registry_url_ = (std::move(fv.registry_url_));
-    bucket_id_ = (std::move(fv.bucket_id_));
-    flow_id_ = (std::move(fv.flow_id_));
-    setFlowVersion(registry_url_, bucket_id_, flow_id_);
+    identifier = std::move(fv.identifier);
     return *this;
   }
  protected:
 
   std::mutex guard;
 
-  std::string registry_url_;
-  std::string bucket_id_;
-  std::string flow_id_;
+  /*std::string registry_url_;
+   std::string bucket_id_;
+   std::string flow_id_;
+   */
+  std::shared_ptr<FlowIdentifier> identifier;
 };
 
 class FlowMonitor : public StateMonitorNode {
@@ -263,7 +268,7 @@ class FlowInformation : public FlowMonitor {
 
 REGISTER_RESOURCE(FlowInformation);
 
-} /* namespace metrics */
+} /* namespace response */
 } /* namespace state */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 8d59065..12ce99f 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -73,7 +73,6 @@ 
FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo
       max_event_driven_threads_(0),
       running_(false),
       updating_(false),
-      flow_version_(nullptr),
       c2_enabled_(true),
       initialized_(false),
       provenance_repo_(provenance_repo),
@@ -159,10 +158,10 @@ FlowController::~FlowController() {
   provenance_repo_ = nullptr;
 }
 
-bool FlowController::applyConfiguration(const std::string &configurePayload) {
+bool FlowController::applyConfiguration(const std::string &source, const 
std::string &configurePayload) {
   std::unique_ptr<core::ProcessGroup> newRoot;
   try {
-    newRoot = flow_configuration_->getRootFromPayload(configurePayload);
+    newRoot = flow_configuration_->updateFromPayload(source, configurePayload);
   } catch (...) {
     logger_->log_error("Invalid configuration payload");
     return false;
@@ -376,7 +375,6 @@ void FlowController::initializeC2() {
   state::StateManager::startMetrics(agent->getHeartBeatDelay());
 
   c2_initialized_ = true;
-  flow_version_ = std::make_shared<state::response::FlowVersion>("", 
"default", "");
   device_information_.clear();
   component_metrics_.clear();
   component_metrics_by_id_.clear();
@@ -443,7 +441,7 @@ void FlowController::initializeC2() {
         }
         flowMonitor->setStateMonitor(shared_from_this());
 
-        flowMonitor->setFlowVersion(flow_version_);
+        flowMonitor->setFlowVersion(flow_configuration_->getFlowVersion());
       }
 
       std::lock_guard<std::mutex> lock(metrics_mutex_);
@@ -806,32 +804,7 @@ void FlowController::enableAllControllerServices() {
 }
 
 int16_t FlowController::applyUpdate(const std::string &source, const 
std::string &configuration) {
-  if (!source.empty()) {
-    std::string host, protocol, path, query, url = source;
-    int port;
-    utils::parse_url(&url, &host, &port, &protocol, &path, &query);
-
-    std::string flow_id, bucket_id;
-    auto path_split = utils::StringUtils::split(path, "/");
-    for (size_t i = 0; i < path_split.size(); i++) {
-      const std::string &str = path_split.at(i);
-      if (str == "flows") {
-        if (i + 1 < path_split.size()) {
-          flow_id = path_split.at(i + 1);
-          i++;
-        }
-      }
-
-      if (str == "bucket") {
-        if (i + 1 < path_split.size()) {
-          bucket_id = path_split.at(i + 1);
-          i++;
-        }
-      }
-    }
-    flow_version_->setFlowVersion(url, bucket_id, flow_id);
-  }
-  if (applyConfiguration(configuration)) {
+  if (applyConfiguration(source, configuration)) {
     return 1;
   } else {
     return 0;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 8775de7..7815e70 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -87,6 +87,14 @@ 
FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository
     event->getResourceClaim()->increaseFlowFileRecordOwnedCount();
     content_full_fath_ = event->getResourceClaim()->getContentFullPath();
   }
+  if (event->getFlowIdentifier()) {
+    std::string attr;
+    event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr);
+    setFlowIdentifier(event->getFlowIdentifier());
+    if (!attr.empty()) {
+      addKeyedAttribute(FlowAttribute::FLOW_ID, attr);
+    }
+  }
 }
 
 FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> 
flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, 
std::shared_ptr<core::FlowFile> &event)
@@ -95,6 +103,14 @@ 
FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository
       snapshot_(""),
       content_repo_(content_repo),
       flow_repository_(flow_repository) {
+  if (event->getFlowIdentifier()) {
+    std::string attr;
+    event->getAttribute(FlowAttributeKey(FlowAttribute::FLOW_ID), attr);
+    setFlowIdentifier(event->getFlowIdentifier());
+    if (!attr.empty()) {
+      addKeyedAttribute(FlowAttribute::FLOW_ID, attr);
+    }
+  }
 }
 
 FlowFileRecord::~FlowFileRecord() {
@@ -129,7 +145,7 @@ void 
FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
   }
 }
 
-bool FlowFileRecord::addKeyedAttribute(FlowAttribute key, std::string value) {
+bool FlowFileRecord::addKeyedAttribute(FlowAttribute key, const std::string 
&value) {
   const char *keyStr = FlowAttributeKey(key);
   if (keyStr) {
     const std::string keyString = keyStr;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp 
b/libminifi/src/core/Connectable.cpp
index 29ee411..746f5ec 100644
--- a/libminifi/src/core/Connectable.cpp
+++ b/libminifi/src/core/Connectable.cpp
@@ -33,12 +33,14 @@ namespace core {
 Connectable::Connectable(std::string name, uuid_t uuid)
     : CoreComponent(name, uuid),
       max_concurrent_tasks_(1),
+      connectable_version_(nullptr),
       logger_(logging::LoggerFactory<Connectable>::getLogger()) {
 }
 
 Connectable::Connectable(const Connectable &&other)
     : CoreComponent(std::move(other)),
       max_concurrent_tasks_(std::move(other.max_concurrent_tasks_)),
+      connectable_version_(std::move(other.connectable_version_)),
       logger_(std::move(other.logger_)) {
   has_work_ = other.has_work_.load();
   strategy_ = other.strategy_.load();

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp 
b/libminifi/src/core/FlowConfiguration.cpp
index b082dce..320797b 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -59,6 +59,35 @@ std::shared_ptr<core::Processor> 
FlowConfiguration::createProvenanceReportTask()
   return processor;
 }
 
+std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const 
std::string &source, const std::string &yamlConfigPayload) {
+    if (!source.empty()) {
+      std::string host, protocol, path, query, url = source;
+      int port;
+      utils::parse_url(&url, &host, &port, &protocol, &path, &query);
+
+      std::string flow_id, bucket_id;
+      auto path_split = utils::StringUtils::split(path, "/");
+      for (size_t i = 0; i < path_split.size(); i++) {
+        const std::string &str = path_split.at(i);
+        if (str == "flows") {
+          if (i + 1 < path_split.size()) {
+            flow_id = path_split.at(i + 1);
+            i++;
+          }
+        }
+
+        if (str == "bucket") {
+          if (i + 1 < path_split.size()) {
+            bucket_id = path_split.at(i + 1);
+            i++;
+          }
+        }
+      }
+      flow_version_->setFlowVersion(url, bucket_id, flow_id);
+    }
+    return getRootFromPayload(yamlConfigPayload);
+  }
+
 std::unique_ptr<core::ProcessGroup> 
FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid, int 
version) {
   return std::unique_ptr<core::ProcessGroup>(new 
core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version));
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index 9b732dc..1bfa9f8 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -43,7 +43,15 @@ std::shared_ptr<utils::IdGenerator> 
ProcessSession::id_generator_ = utils::IdGen
 std::shared_ptr<core::FlowFile> ProcessSession::create() {
   std::map<std::string, std::string> empty;
 
-  std::shared_ptr<core::FlowFile> record = 
std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), 
process_context_->getContentRepository(), empty);
+  auto flow_version = 
process_context_->getProcessorNode()->getFlowIdentifier();
+
+  std::shared_ptr<FlowFileRecord> record = 
std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), 
process_context_->getContentRepository(), empty);
+
+  if (flow_version != nullptr) {
+    auto flow_id = flow_version->getFlowId();
+    std::string attr = FlowAttributeKey(FLOW_ID);
+    record->setAttribute(attr, flow_version->getFlowId());
+  }
 
   _addedFlowFiles[record->getUUIDStr()] = record;
   logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr());
@@ -60,9 +68,15 @@ void ProcessSession::add(const 
std::shared_ptr<core::FlowFile> &record) {
 
 std::shared_ptr<core::FlowFile> ProcessSession::create(const 
std::shared_ptr<core::FlowFile> &parent) {
   std::map<std::string, std::string> empty;
-  std::shared_ptr<core::FlowFile> record = 
std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), 
process_context_->getContentRepository(), empty);
+  std::shared_ptr<FlowFileRecord> record = 
std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), 
process_context_->getContentRepository(), empty);
 
   if (record) {
+    auto flow_version = 
process_context_->getProcessorNode()->getFlowIdentifier();
+    if (flow_version != nullptr) {
+      auto flow_id = flow_version->getFlowId();
+      std::string attr = FlowAttributeKey(FLOW_ID);
+      record->setAttribute(attr, flow_version->getFlowId());
+    }
     _addedFlowFiles[record->getUUIDStr()] = record;
     logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr());
   }
@@ -105,6 +119,12 @@ std::shared_ptr<core::FlowFile> 
ProcessSession::cloneDuringTransfer(std::shared_
   std::shared_ptr<core::FlowFile> record = 
std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), 
process_context_->getContentRepository(), empty);
 
   if (record) {
+    auto flow_version = 
process_context_->getProcessorNode()->getFlowIdentifier();
+    if (flow_version != nullptr) {
+      auto flow_id = flow_version->getFlowId();
+      std::string attr = FlowAttributeKey(FLOW_ID);
+      record->setAttribute(attr, flow_version->getFlowId());
+    }
     this->_clonedFlowFiles[record->getUUIDStr()] = record;
     logger_->log_debug("Clone FlowFile with UUID %s during transfer", 
record->getUUIDStr());
     // Copy attributes
@@ -374,8 +394,7 @@ void ProcessSession::importFrom(io::DataStream &stream, 
const std::shared_ptr<co
     }
     flow->setResourceClaim(claim);
 
-    logger_->log_debug("Import offset %llu length %llu into content %s for 
FlowFile UUID %s", flow->getOffset(), flow->getSize(), 
flow->getResourceClaim()->getContentFullPath(),
-                       flow->getUUIDStr());
+    logger_->log_debug("Import offset %llu length %llu into content %s for 
FlowFile UUID %s", flow->getOffset(), flow->getSize(), 
flow->getResourceClaim()->getContentFullPath(), flow->getUUIDStr());
 
     content_stream->closeStream();
     std::stringstream details;
@@ -423,9 +442,7 @@ void ProcessSession::import(std::string source, const 
std::shared_ptr<core::Flow
       if (offset != 0) {
         input.seekg(offset);
         if (!input.good()) {
-          logger_->log_error("Seeking to %d failed for file %s (does 
file/filesystem support seeking?)",
-                             offset,
-                             source);
+          logger_->log_error("Seeking to %d failed for file %s (does 
file/filesystem support seeking?)", offset, source);
           invalidWrite = true;
         }
       }
@@ -506,9 +523,7 @@ void ProcessSession::import(std::string source, 
std::vector<std::shared_ptr<Flow
       if (offset != 0) {
         input.seekg(offset, input.beg);
         if (!input.good()) {
-          logger_->log_error("Seeking to %d failed for file %s (does 
file/filesystem support seeking?)",
-                             offset,
-                             source);
+          logger_->log_error("Seeking to %d failed for file %s (does 
file/filesystem support seeking?)", offset, source);
           throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
         }
       }
@@ -549,8 +564,8 @@ void ProcessSession::import(std::string source, 
std::vector<std::shared_ptr<Flow
           }
           flowFile->setResourceClaim(claim);
           claim->increaseFlowFileRecordOwnedCount();
-          logger_->log_debug("Import offset %llu length %llu into content %s 
for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(),
-                             
flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
+          logger_->log_debug("Import offset %llu length %llu into content %s 
for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(), 
flowFile->getResourceClaim()->getContentFullPath(),
+                             flowFile->getUUIDStr());
           stream->closeStream();
           std::string details = 
process_context_->getProcessorNode()->getName() + " modify flow record content 
" + flowFile->getUUIDStr();
           uint64_t endTime = getTimeMillis();
@@ -625,24 +640,24 @@ void ProcessSession::stash(const std::string &key, const 
std::shared_ptr<core::F
     return;
   }
 
-  // Stash the claim
+// Stash the claim
   auto claim = flow->getResourceClaim();
   flow->setStashClaim(key, claim);
 
-  // Clear current claim
+// Clear current claim
   flow->clearResourceClaim();
 }
 
 void ProcessSession::restore(const std::string &key, const 
std::shared_ptr<core::FlowFile> &flow) {
   logger_->log_info("Restoring content to %s from key %s", flow->getUUIDStr(), 
key);
 
-  // Restore the claim
+// Restore the claim
   if (!flow->hasStashClaim(key)) {
     logger_->log_warn("Requested restore to record %s from unknown key %s", 
flow->getUUIDStr(), key);
     return;
   }
 
-  // Disown current claim if existing
+// Disown current claim if existing
   if (flow->getResourceClaim()) {
     logger_->log_warn("Restoring stashed content of record %s from key %s when 
there is "
                       "existing content; existing content will be overwritten",
@@ -650,7 +665,7 @@ void ProcessSession::restore(const std::string &key, const 
std::shared_ptr<core:
     flow->releaseClaim(flow->getResourceClaim());
   }
 
-  // Restore the claim
+// Restore the claim
   auto stashClaim = flow->getStashClaim(key);
   flow->setResourceClaim(stashClaim);
   flow->clearStashClaim(key);
@@ -854,6 +869,12 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() {
       _updatedFlowFiles[ret->getUUIDStr()] = ret;
       std::map<std::string, std::string> empty;
       std::shared_ptr<core::FlowFile> snapshot = 
std::make_shared<FlowFileRecord>(process_context_->getFlowFileRepository(), 
process_context_->getContentRepository(), empty);
+      auto flow_version = 
process_context_->getProcessorNode()->getFlowIdentifier();
+      if (flow_version != nullptr) {
+        auto flow_id = flow_version->getFlowId();
+        std::string attr = FlowAttributeKey(FLOW_ID);
+        snapshot->setAttribute(attr, flow_version->getFlowId());
+      }
       logger_->log_debug("Create Snapshot FlowFile with UUID %s", 
snapshot->getUUIDStr());
       snapshot = ret;
       // save a snapshot

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp 
b/libminifi/src/core/yaml/YamlConfiguration.cpp
index 01cc04a..fbe84ed 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -118,6 +118,8 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node 
processorsNode, core::
         }
         processor->setName(procCfg.name);
 
+        processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
+
         auto strategyNode = getOptionalField(&procNode,
                                              "scheduling strategy",
                                              YAML::Node("EVENT_DRIVEN"),

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/test/TestBase.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 950d8bb..53a1bf6 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -18,7 +18,7 @@
 
 #include "./TestBase.h"
 
-TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> 
prov_repo)
+TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> 
prov_repo, const std::shared_ptr<minifi::state::response::FlowVersion> 
&flow_version)
     :
       content_repo_(content_repo),
       flow_repo_(flow_repo),
@@ -26,6 +26,7 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> 
content_repo, std::s
       finalized(false),
       location(-1),
       current_flowfile_(nullptr),
+      flow_version_(flow_version),
       logger_(logging::LoggerFactory<TestPlan>::getLogger()) {
   stream_factory = 
std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<minifi::Configure>());
 }
@@ -43,6 +44,7 @@ bool linkToPrevious) {
   processor->setStreamFactory(stream_factory);
   // initialize the processor
   processor->initialize();
+  processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
 
   processor_mapping_[processor->getUUIDStr()] = processor;
 
@@ -116,7 +118,7 @@ bool TestPlan::setProperty(const 
std::shared_ptr<core::Processor> proc,
                            const std::string &value,
                            bool dynamic) {
   std::lock_guard<std::recursive_mutex> guard(mutex);
-  uint32_t i = 0;
+  int32_t i = 0;
   logger_->log_info("Attempting to set property %s %s for %s", prop, value, 
proc->getName());
   for (i = 0; i < processor_queue_.size(); i++) {
     if (processor_queue_.at(i) == proc) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/test/TestBase.h
----------------------------------------------------------------------
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 77449cb..4792011 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -43,6 +43,7 @@
 #include "core/ProcessSession.h"
 #include "core/ProcessorNode.h"
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
+#include "core/state/nodes/FlowInformation.h"
 
 class LogTestController {
  public:
@@ -151,19 +152,15 @@ class LogTestController {
 class TestPlan {
  public:
 
-  explicit TestPlan(std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> 
prov_repo);
+  explicit TestPlan(std::shared_ptr<core::ContentRepository> content_repo, 
std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> 
prov_repo, const std::shared_ptr<minifi::state::response::FlowVersion> 
&flow_version);
 
   std::shared_ptr<core::Processor> addProcessor(const 
std::shared_ptr<core::Processor> &processor, const std::string &name,
-                                                core::Relationship 
relationship = core::Relationship("success", "description"),
-                                                bool linkToPrevious = false);
+                                                core::Relationship 
relationship = core::Relationship("success", "description"), bool 
linkToPrevious = false);
 
   std::shared_ptr<core::Processor> addProcessor(const std::string 
&processor_name, const std::string &name, core::Relationship relationship = 
core::Relationship("success", "description"),
-  bool linkToPrevious = false);
+                                                bool linkToPrevious = false);
 
-  bool setProperty(const std::shared_ptr<core::Processor> proc,
-                   const std::string &prop,
-                   const std::string &value,
-                   bool dynamic = false);
+  bool setProperty(const std::shared_ptr<core::Processor> proc, const 
std::string &prop, const std::string &value, bool dynamic = false);
 
   void reset();
 
@@ -209,6 +206,7 @@ class TestPlan {
   std::shared_ptr<core::ProcessSession> current_session_;
   std::shared_ptr<core::FlowFile> current_flowfile_;
 
+  std::shared_ptr<minifi::state::response::FlowVersion> flow_version_;
   std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
   std::vector<std::shared_ptr<core::Processor>> processor_queue_;
   std::vector<std::shared_ptr<core::Processor>> configured_processors_;
@@ -232,6 +230,7 @@ class TestController {
     minifi::setDefaultDirectory("./");
     log.reset();
     
utils::IdGenerator::getIdGenerator()->initialize(std::make_shared<minifi::Properties>());
+    flow_version_ = 
std::make_shared<minifi::state::response::FlowVersion>("test", "test", "test");
   }
 
   std::shared_ptr<TestPlan> createPlan() {
@@ -242,10 +241,9 @@ class TestController {
 
     std::shared_ptr<core::Repository> flow_repo = 
std::make_shared<TestRepository>();
     std::shared_ptr<core::Repository> repo = 
std::make_shared<TestRepository>();
-    return std::make_shared<TestPlan>(content_repo, flow_repo, repo);
+    return std::make_shared<TestPlan>(content_repo, flow_repo, repo, 
flow_version_);
   }
 
-
   void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = 
true, std::function<void(const std::shared_ptr<core::ProcessContext>&, const 
std::shared_ptr<core::ProcessSession>&)> verify =
                       nullptr) {
 
@@ -284,6 +282,8 @@ class TestController {
 
  protected:
 
+  std::shared_ptr<minifi::state::response::FlowVersion> flow_version_;
+
   std::mutex test_mutex;
   //std::map<std::string,>
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/02f18238/libminifi/test/unit/GetFileTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/GetFileTests.cpp 
b/libminifi/test/unit/GetFileTests.cpp
index 6e8aa25..1960c70 100644
--- a/libminifi/test/unit/GetFileTests.cpp
+++ b/libminifi/test/unit/GetFileTests.cpp
@@ -77,6 +77,7 @@ TEST_CASE("GetFile: FIFO", "[getFileFifo]") { // NOLINT
   write_thread.join();
 
   // Check log output
+  REQUIRE(LogTestController::getInstance().contains("key:flow.id"));
   REQUIRE(LogTestController::getInstance().contains("Size:44 Offset:0"));
 }
 

Reply via email to