bakaid commented on a change in pull request #653: MINIFICPP-1033 - 
PublishKafka fixes
URL: https://github.com/apache/nifi-minifi-cpp/pull/653#discussion_r328034270
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -100,89 +107,207 @@ class PublishKafka : public core::Processor {
   static core::Relationship Failure;
   static core::Relationship Success;
 
+  // Message
+   struct MessageResult {
+    bool completed;
+    bool is_error;
+    rd_kafka_resp_err_t err_code;
+
+    MessageResult()
+        : completed(false)
+        , is_error(false) {
+    }
+  };
+  struct FlowFileResult {
+    bool flow_file_error;
+    std::vector<MessageResult> messages;
+
+    FlowFileResult()
+        : flow_file_error(false) {
+    }
+  };
+  struct Messages {
+    std::mutex mutex;
+    std::condition_variable cv;
+    std::vector<FlowFileResult> flow_files;
+    bool interrupted;
+
+    Messages()
+        : interrupted(false) {
+    }
+
+    void waitForCompletion() {
+      std::unique_lock<std::mutex> lock(mutex);
+      cv.wait(lock, [this]() -> bool {
+        if (interrupted) {
+          return true;
+        }
+        size_t index = 0U;
+        return std::all_of(this->flow_files.begin(), this->flow_files.end(), 
[&](const FlowFileResult& flow_file) {
+          index++;
+          if (flow_file.flow_file_error) {
+            return true;
+          }
+          return std::all_of(flow_file.messages.begin(), 
flow_file.messages.end(), [](const MessageResult& message) {
+            return message.completed;
+          });
+        });
+      });
+    }
+
+    void modifyResult(size_t index, const 
std::function<void(FlowFileResult&)>& fun) {
+      std::unique_lock<std::mutex> lock(mutex);
+      fun(flow_files.at(index));
+      cv.notify_all();
+    }
+
+    size_t addFlowFile() {
+      std::lock_guard<std::mutex> lock(mutex);
+      flow_files.emplace_back();
+      return flow_files.size() - 1;
+    }
+
+    void iterateFlowFiles(const std::function<void(size_t /*index*/, const 
FlowFileResult& /*flow_file_result*/)>& fun) {
+      std::lock_guard<std::mutex> lock(mutex);
+      for (size_t index = 0U; index < flow_files.size(); index++) {
+        fun(index, flow_files[index]);
+      }
+    }
+
+    void interrupt() {
+      std::unique_lock<std::mutex> lock(mutex);
+      interrupted = true;
+      cv.notify_all();
+    }
+
+    bool wasInterrupted() {
+      std::lock_guard<std::mutex> lock(mutex);
+      return interrupted;
 
 Review comment:
   I most probably could, but I need interrupted to be under a mutex for the 
condition variable in interrupt() anyway, and to be honest, I do not know the 
exact requirements between condition variables and atomics for correctly 
publishing modifications to the notified thread from the top of my head, so I 
would prefer a defensive approach here. We have enough mutex locks anyway for 
this single call per onTrigger not to matter.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to