briansolo1985 commented on code in PR #7344:
URL: https://github.com/apache/nifi/pull/7344#discussion_r1288113651


##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java:
##########
@@ -35,64 +39,72 @@
 
 public class ConfigurationChangeCoordinator implements Closeable, 
ConfigurationChangeNotifier {
 
-    public static final String NOTIFIER_PROPERTY_PREFIX = 
"nifi.minifi.notifier";
-    public static final String NOTIFIER_INGESTORS_KEY = 
NOTIFIER_PROPERTY_PREFIX + ".ingestors";
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    public static final String NOTIFIER_INGESTORS_KEY = 
"nifi.minifi.notifier.ingestors";
 
-    private final Set<ConfigurationChangeListener> 
configurationChangeListeners;
-    private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+    private static final String COMMA = ",";
 
     private final BootstrapFileProvider bootstrapFileProvider;
     private final RunMiNiFi runMiNiFi;
+    private final Set<ConfigurationChangeListener> 
configurationChangeListeners;
+    private final Set<ChangeIngestor> changeIngestors;
 
     public ConfigurationChangeCoordinator(BootstrapFileProvider 
bootstrapFileProvider, RunMiNiFi runMiNiFi,
-        Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
+                                          Set<ConfigurationChangeListener> 
miNiFiConfigurationChangeListeners) {
         this.bootstrapFileProvider = bootstrapFileProvider;
         this.runMiNiFi = runMiNiFi;
-        this.configurationChangeListeners = 
Optional.ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.configurationChangeListeners = 
ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+        this.changeIngestors = new HashSet<>();
+    }
+
+    @Override
+    public Collection<ListenerHandleResult> notifyListeners(ByteBuffer 
newFlowConfig) {
+        LOGGER.info("Notifying Listeners of a change");
+        return configurationChangeListeners.stream()
+            .map(listener -> notifyListener(newFlowConfig, listener))
+            .collect(toList());
+    }
+
+    @Override
+    public void close() {
+        closeIngestors();
     }
 
     /**
      * Begins the associated notification service provided by the given 
implementation.  In most implementations, no action will occur until this 
method is invoked.
      */
-    public void start() throws IOException{
+    public void start() throws IOException {
         initialize();
         changeIngestors.forEach(ChangeIngestor::start);
     }
 
-    /**
-     * Provides an immutable collection of listeners for the notifier instance
-     *
-     * @return a collection of those listeners registered for notifications
-     */
-    public Set<ConfigurationChangeListener> getChangeListeners() {
-        return Collections.unmodifiableSet(configurationChangeListeners);
+    private ListenerHandleResult notifyListener(ByteBuffer newFlowConfig, 
ConfigurationChangeListener listener) {
+        try {
+            listener.handleChange(new 
ByteBufferInputStream(newFlowConfig.duplicate()));
+            ListenerHandleResult listenerHandleResult = new 
ListenerHandleResult(listener);
+            LOGGER.info("Listener notification result {}", 
listenerHandleResult);
+            return listenerHandleResult;
+        } catch (ConfigurationChangeException ex) {
+            ListenerHandleResult listenerHandleResult = new 
ListenerHandleResult(listener, ex);
+            LOGGER.info("Listener notification result {} with failure {}", 
listenerHandleResult, ex);

Review Comment:
   Thanks, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to