arpadboda commented on a change in pull request #653: MINIFICPP-1033 - 
PublishKafka fixes
URL: https://github.com/apache/nifi-minifi-cpp/pull/653#discussion_r328035606
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -100,89 +107,207 @@ class PublishKafka : public core::Processor {
   static core::Relationship Failure;
   static core::Relationship Success;
 
+  // Message
+   struct MessageResult {
+    bool completed;
+    bool is_error;
+    rd_kafka_resp_err_t err_code;
+
+    MessageResult()
+        : completed(false)
+        , is_error(false) {
+    }
+  };
+  struct FlowFileResult {
+    bool flow_file_error;
+    std::vector<MessageResult> messages;
+
+    FlowFileResult()
+        : flow_file_error(false) {
+    }
+  };
+  struct Messages {
+    std::mutex mutex;
+    std::condition_variable cv;
+    std::vector<FlowFileResult> flow_files;
+    bool interrupted;
+
+    Messages()
+        : interrupted(false) {
+    }
+
+    void waitForCompletion() {
+      std::unique_lock<std::mutex> lock(mutex);
+      cv.wait(lock, [this]() -> bool {
+        if (interrupted) {
+          return true;
+        }
+        size_t index = 0U;
+        return std::all_of(this->flow_files.begin(), this->flow_files.end(), 
[&](const FlowFileResult& flow_file) {
+          index++;
+          if (flow_file.flow_file_error) {
+            return true;
+          }
+          return std::all_of(flow_file.messages.begin(), 
flow_file.messages.end(), [](const MessageResult& message) {
+            return message.completed;
+          });
+        });
+      });
+    }
+
+    void modifyResult(size_t index, const 
std::function<void(FlowFileResult&)>& fun) {
+      std::unique_lock<std::mutex> lock(mutex);
+      fun(flow_files.at(index));
+      cv.notify_all();
+    }
+
+    size_t addFlowFile() {
+      std::lock_guard<std::mutex> lock(mutex);
+      flow_files.emplace_back();
+      return flow_files.size() - 1;
+    }
+
+    void iterateFlowFiles(const std::function<void(size_t /*index*/, const 
FlowFileResult& /*flow_file_result*/)>& fun) {
+      std::lock_guard<std::mutex> lock(mutex);
+      for (size_t index = 0U; index < flow_files.size(); index++) {
+        fun(index, flow_files[index]);
+      }
+    }
+
+    void interrupt() {
+      std::unique_lock<std::mutex> lock(mutex);
+      interrupted = true;
+      cv.notify_all();
+    }
+
+    bool wasInterrupted() {
+      std::lock_guard<std::mutex> lock(mutex);
+      return interrupted;
 
 Review comment:
   Okay, I can live with that, don't think the cost of the mutex would matter 
here anyway. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to