szaszm commented on code in PR #1568:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1568#discussion_r1193765515


##########
libminifi/include/c2/ControllerSocketProtocol.h:
##########
@@ -61,6 +66,46 @@ class ControllerSocketProtocol {
   std::weak_ptr<ControllerSocketReporter> controller_socket_reporter_;
   std::shared_ptr<Configure> configuration_;
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<ControllerSocketProtocol>::getLogger();
+  std::mutex initialization_mutex_;
+
+  // Some commands need to restart the controller socket to reinitialize the 
socket with new data for example new SSL data in case of a flow update
+  // These commands are handled on a separate thread, and while these commands 
are handled other incoming commands are dropped
+  class SocketRestartCommandProcessor {
+   public:
+    explicit SocketRestartCommandProcessor(state::StateMonitor& update_sink_);
+    ~SocketRestartCommandProcessor();
+
+    enum class Command {
+      FLOW_UPDATE,
+      START
+    };
+
+    struct CommandData {
+      Command command;
+      std::string data;
+    };
+
+    void enqueue(const CommandData& command_data) {
+      is_socket_restarting_ = true;
+      command_queue_.enqueue(command_data);
+      command_queue_condition_variable_.notify_all();
+    }
+
+    bool isSocketRestarting() const {
+      return is_socket_restarting_;
+    }
+
+   private:
+    mutable std::atomic_bool is_socket_restarting_ = false;
+    state::StateMonitor& update_sink_;
+    std::thread command_processor_thread_;
+    std::mutex cv_mutex_;
+    std::condition_variable command_queue_condition_variable_;
+    std::atomic_bool running_ = true;
+    moodycamel::ConcurrentQueue<CommandData> command_queue_;

Review Comment:
   The minifi `utils::ConditionConcurrentQueue` could be a better alternative 
here. It's not as performant in high load scenarios, but performance doesn't 
really matter here, and it's less likely to change the order of commands, and 
it has builtin waiting dequeue functions.
   
   `moodycamel::ConcurrentQueue` works with per-thread arrays, and the consumer 
always picks the first non-empty thread array. This means that it doesn't keep 
any ordering between different sources, and it may return elements that were 
inserted later if they happen to be in the first bucket/array. Strictly 
speaking, this still gives happens-before guarantees within a producer thread, 
but nothing between threads. If the second thread inserted something much 
earlier, it will still return the item from the first thread if there are any. 
We used to have strange priority and CPU starving issues when we used to use 
this queue for scheduling processors. It's unlikely to cause any issues here, 
other than occasional command reordering between multiple independent clients.
   
   https://github.com/cameron314/concurrentqueue#high-level-design



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to