arpadboda commented on a change in pull request #653: MINIFICPP-1033 - 
PublishKafka fixes
URL: https://github.com/apache/nifi-minifi-cpp/pull/653#discussion_r327979791
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -100,89 +107,207 @@ class PublishKafka : public core::Processor {
   static core::Relationship Failure;
   static core::Relationship Success;
 
+  // Message
+   struct MessageResult {
+    bool completed;
+    bool is_error;
+    rd_kafka_resp_err_t err_code;
+
+    MessageResult()
+        : completed(false)
+        , is_error(false) {
+    }
+  };
+  struct FlowFileResult {
+    bool flow_file_error;
+    std::vector<MessageResult> messages;
+
+    FlowFileResult()
+        : flow_file_error(false) {
+    }
+  };
+  struct Messages {
+    std::mutex mutex;
+    std::condition_variable cv;
+    std::vector<FlowFileResult> flow_files;
+    bool interrupted;
+
+    Messages()
+        : interrupted(false) {
+    }
+
+    void waitForCompletion() {
+      std::unique_lock<std::mutex> lock(mutex);
+      cv.wait(lock, [this]() -> bool {
+        if (interrupted) {
+          return true;
+        }
+        size_t index = 0U;
+        return std::all_of(this->flow_files.begin(), this->flow_files.end(), 
[&](const FlowFileResult& flow_file) {
+          index++;
+          if (flow_file.flow_file_error) {
+            return true;
+          }
+          return std::all_of(flow_file.messages.begin(), 
flow_file.messages.end(), [](const MessageResult& message) {
+            return message.completed;
+          });
+        });
+      });
+    }
+
+    void modifyResult(size_t index, const 
std::function<void(FlowFileResult&)>& fun) {
+      std::unique_lock<std::mutex> lock(mutex);
+      fun(flow_files.at(index));
+      cv.notify_all();
+    }
+
+    size_t addFlowFile() {
+      std::lock_guard<std::mutex> lock(mutex);
+      flow_files.emplace_back();
+      return flow_files.size() - 1;
+    }
+
+    void iterateFlowFiles(const std::function<void(size_t /*index*/, const 
FlowFileResult& /*flow_file_result*/)>& fun) {
+      std::lock_guard<std::mutex> lock(mutex);
+      for (size_t index = 0U; index < flow_files.size(); index++) {
+        fun(index, flow_files[index]);
+      }
+    }
+
+    void interrupt() {
+      std::unique_lock<std::mutex> lock(mutex);
+      interrupted = true;
+      cv.notify_all();
+    }
+
+    bool wasInterrupted() {
+      std::lock_guard<std::mutex> lock(mutex);
+      return interrupted;
 
 Review comment:
   If you make this bool atomic, you can save the cost of the mutex when 
calling this. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to