exceptionfactory commented on code in PR #6075:
URL: https://github.com/apache/nifi/pull/6075#discussion_r893724461


##########
c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client;
+
+import javax.net.ssl.HostnameVerifier;
+
+/**
+ * Configuration for a C2 Client.
+ */
+public class C2ClientConfig {
+
+    private final String c2Url;
+    private final String c2AckUrl;
+    private final String agentClass;
+    private final String agentIdentifier;
+    private final String confDirectory;
+    private final String runtimeManifestIdentifier;
+    private final String runtimeType;
+    private final long heartbeatPeriod;
+    private final String keystoreFilename;
+    private final String keystorePass;
+    private final String keyPass;
+    private final String keystoreType;
+    private final String truststoreFilename;
+    private final String truststorePass;
+    private final String truststoreType;
+    private final HostnameVerifier hostnameVerifier;

Review Comment:
   The `HostnameVerifier` property does not appear to be used. Overriding the 
`HostnameVerifier` for TLS connections is generally against good security 
practices, so it should be removed from the configuration.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY;
+import static 
org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.minifi.commons.schema.ConfigSchema;
+import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
+import org.slf4j.Logger;
+
+public class MiNiFiConfigurationChangeListener implements 
ConfigurationChangeListener {
+
+    private final RunMiNiFi runner;
+    private final Logger logger;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    private static final ReentrantLock handlingLock = new ReentrantLock();
+
+    public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, 
BootstrapFileProvider bootstrapFileProvider) {
+        this.runner = runner;
+        this.logger = logger;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    @Override
+    public void handleChange(InputStream configInputStream) throws 
ConfigurationChangeException {
+        logger.info("Received notification of a change");
+
+        if (!handlingLock.tryLock()) {
+            throw new ConfigurationChangeException("Instance is already 
handling another change");
+        }
+        // Store the incoming stream as a byte array to be shared among 
components that need it
+        try(ByteArrayOutputStream bufferedConfigOs = new 
ByteArrayOutputStream()) {
+
+            Properties bootstrapProperties = 
bootstrapFileProvider.getBootstrapProperties();
+            File configFile = new 
File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
+
+            byte[] copyArray = new byte[1024];
+            int available;
+            while ((available = configInputStream.read(copyArray)) > 0) {
+                bufferedConfigOs.write(copyArray, 0, available);
+            }
+
+            File swapConfigFile = bootstrapFileProvider.getSwapFile();
+            logger.info("Persisting old configuration to {}", 
swapConfigFile.getAbsolutePath());
+
+            try (FileInputStream configFileInputStream = new 
FileInputStream(configFile)) {
+                Files.copy(configFileInputStream, swapConfigFile.toPath(), 
REPLACE_EXISTING);
+            }
+
+            
persistBackNonFlowSectionsFromOriginalSchema(bufferedConfigOs.toByteArray(), 
bootstrapProperties, configFile);
+
+            // Create an input stream to feed to the config transformer
+            try (FileInputStream newConfigIs = new 
FileInputStream(configFile)) {
+
+                try {
+                    String confDir = 
bootstrapProperties.getProperty(CONF_DIR_KEY);
+
+                    try {
+                        logger.info("Performing transformation for input and 
saving outputs to {}", confDir);
+                        ByteBuffer tempConfigFile = 
generateConfigFiles(newConfigIs, confDir, 
bootstrapFileProvider.getBootstrapProperties());
+                        
runner.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+
+                        try {
+                            logger.info("Reloading instance with new 
configuration");
+                            restartInstance();
+                        } catch (Exception e) {
+                            logger.debug("Transformation of new config file 
failed after transformation into Flow.xml and nifi.properties, reverting.");
+                            try (FileInputStream swapConfigFileStream = new 
FileInputStream(swapConfigFile)) {
+                                ByteBuffer resetConfigFile = 
generateConfigFiles(swapConfigFileStream, confDir, 
bootstrapFileProvider.getBootstrapProperties());
+                                
runner.getConfigFileReference().set(resetConfigFile.asReadOnlyBuffer());
+                            }
+                            throw e;
+                        }
+                    } catch (Exception e) {
+                        logger.debug("Transformation of new config file failed 
after replacing original with the swap file, reverting.");
+                        try (FileInputStream swapConfigFileStream = new 
FileInputStream(swapConfigFile)) {
+                            Files.copy(swapConfigFileStream, 
configFile.toPath(), REPLACE_EXISTING);
+                        }
+                        throw e;
+                    }
+                } catch (Exception e) {
+                    logger.debug("Transformation of new config file failed 
after swap file was created, deleting it.");
+                    if (!swapConfigFile.delete()) {
+                        logger.warn("The swap file failed to delete after a 
failed handling of a change. It should be cleaned up manually.");
+                    }
+                    throw e;
+                }
+            }

Review Comment:
   This set of try-catch blocks is nested deeply, it would be helpful to 
refactor to separate methods.



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiCommandSender.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MiNiFiCommandSender {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MiNiFiCommandSender.class);
+    private static final String PING_CMD = "PING";
+    private static final int SOCKET_TIMEOUT = 10000;
+    private static final int CONNECTION_TIMEOUT = 10000;
+
+    private final MiNiFiParameters miNiFiParameters;
+    private final ObjectMapper objectMapper;
+
+    public MiNiFiCommandSender(MiNiFiParameters miNiFiParameters, ObjectMapper 
objectMapper) {
+        this.miNiFiParameters = miNiFiParameters;
+        this.objectMapper = objectMapper;
+    }
+
+    public Optional<String> sendCommand(String cmd, Integer port, String... 
extraParams) throws IOException {
+        Optional<String> response = Optional.empty();
+
+        if (port == null) {
+            LOGGER.info("Apache MiNiFi is not currently running");
+            return response;
+        }
+
+        try (Socket socket = new Socket()) {
+            LOGGER.debug("Connecting to MiNiFi instance");
+            socket.setSoTimeout(SOCKET_TIMEOUT);
+            socket.connect(new InetSocketAddress("localhost", port), 
CONNECTION_TIMEOUT);
+            LOGGER.debug("Established connection to MiNiFi instance.");
+
+            LOGGER.debug("Sending {} Command to port {}", cmd, port);
+
+            String responseString;
+            try (OutputStream out = socket.getOutputStream()) {
+                out.write(getCommand(cmd, extraParams));
+                out.flush();
+                responseString = readResponse(socket);
+            }
+
+            LOGGER.debug("Received response to {} command: {}", cmd, 
responseString);
+            response = Optional.of(responseString);
+        } catch (EOFException | SocketTimeoutException e) {
+            String message = "Failed to get response for " + cmd + " 
Potentially due to the process currently being down (restarting or otherwise)";
+            throw new RuntimeException(message);
+        }
+        return response;
+    }
+
+    <T> T sendCommandForObject(String cmd, Integer port, Class<T> clazz, 
String... extraParams) throws IOException {
+        return sendCommand(cmd, port, extraParams)
+            .map(response -> deserialize(cmd, response, clazz))
+            .orElse(null);
+    }
+
+    private String readResponse(Socket socket) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        int numLines = 0;
+        try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream()))) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                if (numLines++ > 0) {
+                    sb.append("\n");
+                }
+                sb.append(line);
+            }
+        }
+
+        return sb.toString().trim();
+    }
+
+    private byte[] getCommand(String cmd, String... args) {
+        String argsString = Arrays.stream(args).collect(Collectors.joining(" 
"));
+        String commandWithArgs = cmd + " " + miNiFiParameters.getSecretKey() + 
(args.length > 0 ? " " : "") + argsString + "\n";
+        return commandWithArgs.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private <T> T deserialize(String cmd, String obj, Class<T> clazz) {
+        T response;
+        try {
+            response = objectMapper.readValue(obj, clazz);
+        } catch (JsonProcessingException e) {
+            String message = "Failed to deserialize " + cmd + " response";
+            LOGGER.error(message);
+            throw new RuntimeException(message);

Review Comment:
   The `JsonProcessingException` should be passed as the cause.
   ```suggestion
               throw new RuntimeException(message, e);
   ```



##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf:
##########
@@ -129,3 +128,36 @@ java.arg.7=-Djava.security.egd=file:/dev/urandom
 
 #Set headless mode by default
 java.arg.14=-Djava.awt.headless=true
+
+# MiNiFi Command & Control Configuration
+# C2 Properties
+# Enabling C2 Uncomment each of the following options
+#c2.enable=true
+## define protocol parameters
+#c2.rest.url=
+#c2.rest.url.ack=
+## c2 timeouts
+#c2.rest.connectionTimeout=5 sec
+#c2.rest.readTimeout=5 sec
+#c2.rest.callTimeout=10 sec
+## heartbeat in milliseconds
+#c2.agent.heartbeat.period=5000
+## define parameters about your agent
+#c2.agent.class=
+#c2.config.directory=./conf
+#c2.runtime.manifest.identifier=minifi
+#c2.runtime.type=minifi-java
+# Optional.  Defaults to a hardware based unique identifier
+#c2.agent.identifier=
+## Define TLS security properties for C2 communications
+#c2.security.truststore.location=
+#c2.security.truststore.password=
+#c2.security.truststore.type=JKS
+#c2.security.keystore.location=
+#c2.security.keystore.password=
+#c2.security.keystore.type=JKS
+#c2.security.need.client.auth=true

Review Comment:
   Is this property used?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY;
+import static 
org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.minifi.commons.schema.ConfigSchema;
+import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
+import org.slf4j.Logger;
+
+public class MiNiFiConfigurationChangeListener implements 
ConfigurationChangeListener {
+
+    private final RunMiNiFi runner;
+    private final Logger logger;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    private static final ReentrantLock handlingLock = new ReentrantLock();
+
+    public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, 
BootstrapFileProvider bootstrapFileProvider) {
+        this.runner = runner;
+        this.logger = logger;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    @Override
+    public void handleChange(InputStream configInputStream) throws 
ConfigurationChangeException {
+        logger.info("Received notification of a change");
+
+        if (!handlingLock.tryLock()) {
+            throw new ConfigurationChangeException("Instance is already 
handling another change");
+        }
+        // Store the incoming stream as a byte array to be shared among 
components that need it
+        try(ByteArrayOutputStream bufferedConfigOs = new 
ByteArrayOutputStream()) {
+
+            Properties bootstrapProperties = 
bootstrapFileProvider.getBootstrapProperties();
+            File configFile = new 
File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
+
+            byte[] copyArray = new byte[1024];
+            int available;
+            while ((available = configInputStream.read(copyArray)) > 0) {
+                bufferedConfigOs.write(copyArray, 0, available);
+            }

Review Comment:
   It looks like this approach could be replaced with `IOUtils.copy()`



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY;
+import static 
org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.minifi.commons.schema.ConfigSchema;
+import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
+import org.slf4j.Logger;
+
+public class MiNiFiConfigurationChangeListener implements 
ConfigurationChangeListener {
+
+    private final RunMiNiFi runner;
+    private final Logger logger;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    private static final ReentrantLock handlingLock = new ReentrantLock();
+
+    public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, 
BootstrapFileProvider bootstrapFileProvider) {
+        this.runner = runner;
+        this.logger = logger;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    @Override
+    public void handleChange(InputStream configInputStream) throws 
ConfigurationChangeException {
+        logger.info("Received notification of a change");
+
+        if (!handlingLock.tryLock()) {
+            throw new ConfigurationChangeException("Instance is already 
handling another change");
+        }
+        // Store the incoming stream as a byte array to be shared among 
components that need it
+        try(ByteArrayOutputStream bufferedConfigOs = new 
ByteArrayOutputStream()) {
+
+            Properties bootstrapProperties = 
bootstrapFileProvider.getBootstrapProperties();
+            File configFile = new 
File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
+
+            byte[] copyArray = new byte[1024];
+            int available;
+            while ((available = configInputStream.read(copyArray)) > 0) {
+                bufferedConfigOs.write(copyArray, 0, available);
+            }
+
+            File swapConfigFile = bootstrapFileProvider.getSwapFile();
+            logger.info("Persisting old configuration to {}", 
swapConfigFile.getAbsolutePath());
+
+            try (FileInputStream configFileInputStream = new 
FileInputStream(configFile)) {
+                Files.copy(configFileInputStream, swapConfigFile.toPath(), 
REPLACE_EXISTING);
+            }
+
+            
persistBackNonFlowSectionsFromOriginalSchema(bufferedConfigOs.toByteArray(), 
bootstrapProperties, configFile);
+
+            // Create an input stream to feed to the config transformer
+            try (FileInputStream newConfigIs = new 
FileInputStream(configFile)) {
+
+                try {
+                    String confDir = 
bootstrapProperties.getProperty(CONF_DIR_KEY);
+
+                    try {
+                        logger.info("Performing transformation for input and 
saving outputs to {}", confDir);
+                        ByteBuffer tempConfigFile = 
generateConfigFiles(newConfigIs, confDir, 
bootstrapFileProvider.getBootstrapProperties());
+                        
runner.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+
+                        try {
+                            logger.info("Reloading instance with new 
configuration");
+                            restartInstance();
+                        } catch (Exception e) {
+                            logger.debug("Transformation of new config file 
failed after transformation into Flow.xml and nifi.properties, reverting.");
+                            try (FileInputStream swapConfigFileStream = new 
FileInputStream(swapConfigFile)) {
+                                ByteBuffer resetConfigFile = 
generateConfigFiles(swapConfigFileStream, confDir, 
bootstrapFileProvider.getBootstrapProperties());
+                                
runner.getConfigFileReference().set(resetConfigFile.asReadOnlyBuffer());
+                            }
+                            throw e;
+                        }
+                    } catch (Exception e) {
+                        logger.debug("Transformation of new config file failed 
after replacing original with the swap file, reverting.");
+                        try (FileInputStream swapConfigFileStream = new 
FileInputStream(swapConfigFile)) {
+                            Files.copy(swapConfigFileStream, 
configFile.toPath(), REPLACE_EXISTING);
+                        }
+                        throw e;
+                    }
+                } catch (Exception e) {
+                    logger.debug("Transformation of new config file failed 
after swap file was created, deleting it.");
+                    if (!swapConfigFile.delete()) {
+                        logger.warn("The swap file failed to delete after a 
failed handling of a change. It should be cleaned up manually.");
+                    }
+                    throw e;
+                }
+            }
+        } catch (ConfigurationChangeException e){
+            logger.error("Unable to carry out reloading of configuration on 
receipt of notification event", e);
+            throw e;
+        } catch (IOException ioe) {
+            logger.error("Unable to carry out reloading of configuration on 
receipt of notification event", ioe);
+            throw new ConfigurationChangeException("Unable to perform reload 
of received configuration change", ioe);

Review Comment:
   Is there a reason for logging the error and throw the exception? Shouldn't 
the exception be caught and logged by the caller? This approach seems like it 
would generate multiple log messages for the same problem.



##########
c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client;
+
+import javax.net.ssl.HostnameVerifier;
+
+/**
+ * Configuration for a C2 Client.
+ */
+public class C2ClientConfig {
+
+    private final String c2Url;
+    private final String c2AckUrl;
+    private final String agentClass;
+    private final String agentIdentifier;
+    private final String confDirectory;
+    private final String runtimeManifestIdentifier;
+    private final String runtimeType;
+    private final long heartbeatPeriod;
+    private final String keystoreFilename;
+    private final String keystorePass;
+    private final String keyPass;
+    private final String keystoreType;
+    private final String truststoreFilename;
+    private final String truststorePass;
+    private final String truststoreType;
+    private final HostnameVerifier hostnameVerifier;

Review Comment:
   @mattyb149 or @bejancsaba Can this property be removed?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY;
+import static 
org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY;
+import static 
org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import 
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.minifi.commons.schema.ConfigSchema;
+import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader;
+import org.slf4j.Logger;
+
+public class MiNiFiConfigurationChangeListener implements 
ConfigurationChangeListener {
+
+    private final RunMiNiFi runner;
+    private final Logger logger;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    private static final ReentrantLock handlingLock = new ReentrantLock();
+
+    public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, 
BootstrapFileProvider bootstrapFileProvider) {
+        this.runner = runner;
+        this.logger = logger;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    @Override
+    public void handleChange(InputStream configInputStream) throws 
ConfigurationChangeException {
+        logger.info("Received notification of a change");
+
+        if (!handlingLock.tryLock()) {
+            throw new ConfigurationChangeException("Instance is already 
handling another change");
+        }
+        // Store the incoming stream as a byte array to be shared among 
components that need it
+        try(ByteArrayOutputStream bufferedConfigOs = new 
ByteArrayOutputStream()) {
+
+            Properties bootstrapProperties = 
bootstrapFileProvider.getBootstrapProperties();
+            File configFile = new 
File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
+
+            byte[] copyArray = new byte[1024];
+            int available;
+            while ((available = configInputStream.read(copyArray)) > 0) {
+                bufferedConfigOs.write(copyArray, 0, available);
+            }
+
+            File swapConfigFile = bootstrapFileProvider.getSwapFile();
+            logger.info("Persisting old configuration to {}", 
swapConfigFile.getAbsolutePath());
+
+            try (FileInputStream configFileInputStream = new 
FileInputStream(configFile)) {
+                Files.copy(configFileInputStream, swapConfigFile.toPath(), 
REPLACE_EXISTING);
+            }
+
+            
persistBackNonFlowSectionsFromOriginalSchema(bufferedConfigOs.toByteArray(), 
bootstrapProperties, configFile);
+
+            // Create an input stream to feed to the config transformer
+            try (FileInputStream newConfigIs = new 
FileInputStream(configFile)) {
+
+                try {
+                    String confDir = 
bootstrapProperties.getProperty(CONF_DIR_KEY);
+
+                    try {
+                        logger.info("Performing transformation for input and 
saving outputs to {}", confDir);
+                        ByteBuffer tempConfigFile = 
generateConfigFiles(newConfigIs, confDir, 
bootstrapFileProvider.getBootstrapProperties());
+                        
runner.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+
+                        try {
+                            logger.info("Reloading instance with new 
configuration");
+                            restartInstance();
+                        } catch (Exception e) {
+                            logger.debug("Transformation of new config file 
failed after transformation into Flow.xml and nifi.properties, reverting.");
+                            try (FileInputStream swapConfigFileStream = new 
FileInputStream(swapConfigFile)) {
+                                ByteBuffer resetConfigFile = 
generateConfigFiles(swapConfigFileStream, confDir, 
bootstrapFileProvider.getBootstrapProperties());
+                                
runner.getConfigFileReference().set(resetConfigFile.asReadOnlyBuffer());
+                            }
+                            throw e;
+                        }
+                    } catch (Exception e) {
+                        logger.debug("Transformation of new config file failed 
after replacing original with the swap file, reverting.");
+                        try (FileInputStream swapConfigFileStream = new 
FileInputStream(swapConfigFile)) {
+                            Files.copy(swapConfigFileStream, 
configFile.toPath(), REPLACE_EXISTING);
+                        }
+                        throw e;
+                    }
+                } catch (Exception e) {
+                    logger.debug("Transformation of new config file failed 
after swap file was created, deleting it.");
+                    if (!swapConfigFile.delete()) {
+                        logger.warn("The swap file failed to delete after a 
failed handling of a change. It should be cleaned up manually.");
+                    }
+                    throw e;
+                }
+            }
+        } catch (ConfigurationChangeException e){
+            logger.error("Unable to carry out reloading of configuration on 
receipt of notification event", e);
+            throw e;
+        } catch (IOException ioe) {
+            logger.error("Unable to carry out reloading of configuration on 
receipt of notification event", ioe);
+            throw new ConfigurationChangeException("Unable to perform reload 
of received configuration change", ioe);
+        } finally {
+            IOUtils.closeQuietly(configInputStream);
+            handlingLock.unlock();
+        }
+    }
+
+    @Override
+    public String getDescriptor() {
+        return "MiNiFiConfigurationChangeListener";
+    }
+
+    private void restartInstance() throws IOException {
+        try {
+            runner.reload();
+        } catch (IOException e) {
+            throw new IOException("Unable to successfully restart MiNiFi 
instance after configuration change.", e);
+        }
+    }
+
+    private void persistBackNonFlowSectionsFromOriginalSchema(byte[] 
newSchema, Properties bootstrapProperties, File configFile) {
+        try {
+            ConvertableSchema<ConfigSchema> schemaNew = ConfigTransformer
+                .throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new 
ByteArrayInputStream(newSchema)));
+            ConfigSchema configSchemaNew = 
ConfigTransformer.throwIfInvalid(schemaNew.convert());
+            ConvertableSchema<ConfigSchema> schemaOld = ConfigTransformer
+                .throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new 
ByteBufferInputStream(runner.getConfigFileReference().get().duplicate())));
+            ConfigSchema configSchemaOld = 
ConfigTransformer.throwIfInvalid(schemaOld.convert());
+
+            
configSchemaNew.setNifiPropertiesOverrides(configSchemaOld.getNifiPropertiesOverrides());
+
+            if (!overrideCoreProperties(bootstrapProperties)) {
+                logger.debug("Preserving previous core properties...");
+                
configSchemaNew.setCoreProperties(configSchemaOld.getCoreProperties());
+            }
+
+            if (!overrideSecurityProperties(bootstrapProperties)) {
+                logger.debug("Preserving previous security properties...");
+                
configSchemaNew.setSecurityProperties(configSchemaOld.getSecurityProperties());
+            }
+
+            logger.debug("Persisting changes to {}", 
configFile.getAbsolutePath());
+            SchemaLoader.toYaml(configSchemaNew, new FileWriter(configFile));
+        } catch (Exception e) {
+            logger.error("Loading the old and the new schema for merging was 
not successful");

Review Comment:
   The exception should be logged:
   ```suggestion
               logger.error("Loading the old and the new schema for merging was 
not successful", e);
   ```



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2;
+
+import java.util.concurrent.TimeUnit;
+
+public class C2NiFiProperties {
+
+    public static final String C2_PREFIX = "c2.";
+
+    public static final String C2_ENABLE_KEY = C2_PREFIX + "enable";
+    public static final String C2_AGENT_PROTOCOL_KEY = C2_PREFIX + 
"agent.protocol.class";
+    public static final String C2_COAP_HOST_KEY = C2_PREFIX + 
"agent.coap.host";
+    public static final String C2_COAP_PORT_KEY = C2_PREFIX + 
"agent.coap.port";
+    public static final String C2_CONFIG_DIRECTORY_KEY = C2_PREFIX + 
"config.directory";
+    public static final String C2_RUNTIME_MANIFEST_IDENTIFIER_KEY = C2_PREFIX 
+ "runtime.manifest.identifier";
+    public static final String C2_RUNTIME_TYPE_KEY = C2_PREFIX + 
"runtime.type";
+    public static final String C2_REST_URL_KEY = C2_PREFIX + "rest.url";
+    public static final String C2_REST_URL_ACK_KEY = C2_PREFIX + 
"rest.url.ack";
+    public static final String C2_ROOT_CLASSES_KEY = C2_PREFIX + 
"root.classes";
+    public static final String C2_AGENT_HEARTBEAT_PERIOD_KEY = C2_PREFIX + 
"agent.heartbeat.period";
+    public static final String C2_CONNECTION_TIMEOUT = C2_PREFIX + 
"rest.connectionTimeout";
+    public static final String C2_READ_TIMEOUT = C2_PREFIX + 
"rest.readTimeout";
+    public static final String C2_CALL_TIMEOUT = C2_PREFIX + 
"rest.callTimeout";
+    public static final String C2_AGENT_CLASS_KEY = C2_PREFIX + "agent.class";
+    public static final String C2_AGENT_IDENTIFIER_KEY = C2_PREFIX + 
"agent.identifier";
+
+    public static final String C2_ROOT_CLASS_DEFINITIONS_KEY = C2_PREFIX + 
"root.class.definitions";
+    public static final String C2_METRICS_NAME_KEY = 
C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.name";
+    public static final String C2_METRICS_METRICS_KEY = 
C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics";
+    public static final String C2_METRICS_METRICS_TYPED_METRICS_NAME_KEY = 
C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.typedmetrics.name";
+    public static final String C2_METRICS_METRICS_QUEUED_METRICS_NAME_KEY = 
C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.queuemetrics.name";
+    public static final String C2_METRICS_METRICS_QUEUE_METRICS_CLASSES_KEY = 
C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.queuemetrics.classes";
+    public static final String C2_METRICS_METRICS_TYPED_METRICS_CLASSES_KEY = 
C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.typedmetrics.classes";
+    public static final String C2_METRICS_METRICS_PROCESSOR_METRICS_NAME_KEY = 
C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.processorMetrics.name";
+    public static final String 
C2_METRICS_METRICS_PROCESSOR_METRICS_CLASSES_KEY = 
C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.processorMetrics.classes";
+
+    /* C2 Client Security Properties */
+    private static final String C2_REST_SECURITY_BASE_KEY = C2_PREFIX + 
"security";
+    public static final String TRUSTSTORE_LOCATION_KEY = 
C2_REST_SECURITY_BASE_KEY + ".truststore.location";
+    public static final String TRUSTSTORE_PASSWORD_KEY = 
C2_REST_SECURITY_BASE_KEY + ".truststore.password";
+    public static final String TRUSTSTORE_TYPE_KEY = C2_REST_SECURITY_BASE_KEY 
+ ".truststore.type";
+    public static final String KEYSTORE_LOCATION_KEY = 
C2_REST_SECURITY_BASE_KEY + ".keystore.location";
+    public static final String KEYSTORE_PASSWORD_KEY = 
C2_REST_SECURITY_BASE_KEY + ".keystore.password";
+    public static final String KEYSTORE_TYPE_KEY = C2_REST_SECURITY_BASE_KEY + 
".keystore.type";
+    public static final String NEED_CLIENT_AUTH_KEY = 
C2_REST_SECURITY_BASE_KEY + ".need.client.auth";

Review Comment:
   Is this property still applicable?



##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/PeriodicStatusReporterManager.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static 
org.apache.nifi.minifi.commons.schema.common.BootstrapPropertyKeys.STATUS_REPORTER_COMPONENTS_KEY;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
+import org.apache.nifi.minifi.bootstrap.MiNiFiStatus;
+import org.apache.nifi.minifi.bootstrap.QueryableStatusAggregator;
+import org.apache.nifi.minifi.bootstrap.status.PeriodicStatusReporter;
+import org.apache.nifi.minifi.commons.status.FlowStatusReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PeriodicStatusReporterManager implements 
QueryableStatusAggregator {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PeriodicStatusReporterManager.class);
+    private static final String FLOW_STATUS_REPORT_CMD = "FLOW_STATUS_REPORT";
+
+    private final Properties bootstrapProperties;
+    private final MiNiFiStatusProvider miNiFiStatusProvider;
+    private final MiNiFiCommandSender miNiFiCommandSender;
+    private final MiNiFiParameters miNiFiParameters;
+
+    private Set<PeriodicStatusReporter> periodicStatusReporters = 
Collections.emptySet();
+
+    public PeriodicStatusReporterManager(Properties bootstrapProperties, 
MiNiFiStatusProvider miNiFiStatusProvider, MiNiFiCommandSender 
miNiFiCommandSender,
+        MiNiFiParameters miNiFiParameters) {
+        this.bootstrapProperties = bootstrapProperties;
+        this.miNiFiStatusProvider = miNiFiStatusProvider;
+        this.miNiFiCommandSender = miNiFiCommandSender;
+        this.miNiFiParameters = miNiFiParameters;
+    }
+
+    public void startPeriodicNotifiers() {
+        periodicStatusReporters = initializePeriodicNotifiers();
+
+        for (PeriodicStatusReporter periodicStatusReporter: 
periodicStatusReporters) {
+            periodicStatusReporter.start();
+            LOGGER.debug("Started {} notifier", 
periodicStatusReporter.getClass().getCanonicalName());
+        }
+    }
+
+    public void shutdownPeriodicStatusReporters() {
+        LOGGER.debug("Initiating shutdown of bootstrap periodic status 
reporters...");
+        for (PeriodicStatusReporter periodicStatusReporter : 
periodicStatusReporters) {
+            try {
+                periodicStatusReporter.stop();
+            } catch (Exception exception) {
+                LOGGER.error("Could not successfully stop periodic status 
reporter " + periodicStatusReporter.getClass() + " due to ", exception);
+            }
+        }
+    }
+
+    public FlowStatusReport statusReport(String statusRequest) {
+        MiNiFiStatus status = 
miNiFiStatusProvider.getStatus(miNiFiParameters.getMiNiFiPort(), 
miNiFiParameters.getMinifiPid());
+
+        List<String> problemsGeneratingReport = new LinkedList<>();
+        if (!status.isProcessRunning()) {
+            problemsGeneratingReport.add("MiNiFi process is not running");
+        }
+
+        if (!status.isRespondingToPing()) {
+            problemsGeneratingReport.add("MiNiFi process is not responding to 
pings");
+        }
+
+        if (!problemsGeneratingReport.isEmpty()) {
+            FlowStatusReport flowStatusReport = new FlowStatusReport();
+            
flowStatusReport.setErrorsGeneratingReport(problemsGeneratingReport);
+            return flowStatusReport;
+        }
+
+        return getFlowStatusReport(statusRequest, status.getPort());
+    }
+
+    private Set<PeriodicStatusReporter> initializePeriodicNotifiers() {
+        LOGGER.debug("Initiating bootstrap periodic status reporters...");
+        Set<PeriodicStatusReporter> statusReporters = new HashSet<>();
+
+        String reportersCsv = 
bootstrapProperties.getProperty(STATUS_REPORTER_COMPONENTS_KEY);
+
+        if (reportersCsv != null && !reportersCsv.isEmpty()) {
+            for (String reporterClassname : reportersCsv.split(",")) {
+                try {
+                    Class<?> reporterClass = Class.forName(reporterClassname);
+                    PeriodicStatusReporter reporter = (PeriodicStatusReporter) 
reporterClass.newInstance();
+                    reporter.initialize(bootstrapProperties, this);
+                    statusReporters.add(reporter);
+                    LOGGER.debug("Initialized {} notifier", 
reporterClass.getCanonicalName());
+                } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
+                    throw new RuntimeException("Issue instantiating notifier " 
+ reporterClassname, e);
+                }
+            }
+        }
+        return statusReporters;
+    }
+
+    private FlowStatusReport getFlowStatusReport(String statusRequest, int 
port) {
+        FlowStatusReport flowStatusReport;
+        try {
+            flowStatusReport = 
miNiFiCommandSender.sendCommandForObject(FLOW_STATUS_REPORT_CMD, port, 
FlowStatusReport.class, statusRequest);
+        } catch (Exception e) {
+            flowStatusReport = new FlowStatusReport();
+            String message = "Failed to get status report from MiNiFi due to:" 
+ e.getMessage();
+            
flowStatusReport.setErrorsGeneratingReport(Collections.singletonList(message));
+            LOGGER.error(message);

Review Comment:
   Recommend logging exception for tracking:
   ```suggestion
               LOGGER.error(message, e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to