exceptionfactory commented on code in PR #6075: URL: https://github.com/apache/nifi/pull/6075#discussion_r893724461
########## c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.c2.client; + +import javax.net.ssl.HostnameVerifier; + +/** + * Configuration for a C2 Client. + */ +public class C2ClientConfig { + + private final String c2Url; + private final String c2AckUrl; + private final String agentClass; + private final String agentIdentifier; + private final String confDirectory; + private final String runtimeManifestIdentifier; + private final String runtimeType; + private final long heartbeatPeriod; + private final String keystoreFilename; + private final String keystorePass; + private final String keyPass; + private final String keystoreType; + private final String truststoreFilename; + private final String truststorePass; + private final String truststoreType; + private final HostnameVerifier hostnameVerifier; Review Comment: The `HostnameVerifier` property does not appear to be used. Overriding the `HostnameVerifier` for TLS connections is generally against good security practices, so it should be removed from the configuration. ########## minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.service; + +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY; +import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY; +import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY; +import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY; +import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Properties; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.minifi.bootstrap.RunMiNiFi; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; +import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream; +import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer; +import org.apache.nifi.minifi.commons.schema.ConfigSchema; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; +import org.slf4j.Logger; + +public class MiNiFiConfigurationChangeListener implements ConfigurationChangeListener { + + private final RunMiNiFi runner; + private final Logger logger; + private final BootstrapFileProvider bootstrapFileProvider; + + private static final ReentrantLock handlingLock = new ReentrantLock(); + + public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider) { + this.runner = runner; + this.logger = logger; + this.bootstrapFileProvider = bootstrapFileProvider; + } + + @Override + public void handleChange(InputStream configInputStream) throws ConfigurationChangeException { + logger.info("Received notification of a change"); + + if (!handlingLock.tryLock()) { + throw new ConfigurationChangeException("Instance is already handling another change"); + } + // Store the incoming stream as a byte array to be shared among components that need it + try(ByteArrayOutputStream bufferedConfigOs = new ByteArrayOutputStream()) { + + Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties(); + File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)); + + byte[] copyArray = new byte[1024]; + int available; + while ((available = configInputStream.read(copyArray)) > 0) { + bufferedConfigOs.write(copyArray, 0, available); + } + + File swapConfigFile = bootstrapFileProvider.getSwapFile(); + logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath()); + + try (FileInputStream configFileInputStream = new FileInputStream(configFile)) { + Files.copy(configFileInputStream, swapConfigFile.toPath(), REPLACE_EXISTING); + } + + persistBackNonFlowSectionsFromOriginalSchema(bufferedConfigOs.toByteArray(), bootstrapProperties, configFile); + + // Create an input stream to feed to the config transformer + try (FileInputStream newConfigIs = new FileInputStream(configFile)) { + + try { + String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY); + + try { + logger.info("Performing transformation for input and saving outputs to {}", confDir); + ByteBuffer tempConfigFile = generateConfigFiles(newConfigIs, confDir, bootstrapFileProvider.getBootstrapProperties()); + runner.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer()); + + try { + logger.info("Reloading instance with new configuration"); + restartInstance(); + } catch (Exception e) { + logger.debug("Transformation of new config file failed after transformation into Flow.xml and nifi.properties, reverting."); + try (FileInputStream swapConfigFileStream = new FileInputStream(swapConfigFile)) { + ByteBuffer resetConfigFile = generateConfigFiles(swapConfigFileStream, confDir, bootstrapFileProvider.getBootstrapProperties()); + runner.getConfigFileReference().set(resetConfigFile.asReadOnlyBuffer()); + } + throw e; + } + } catch (Exception e) { + logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting."); + try (FileInputStream swapConfigFileStream = new FileInputStream(swapConfigFile)) { + Files.copy(swapConfigFileStream, configFile.toPath(), REPLACE_EXISTING); + } + throw e; + } + } catch (Exception e) { + logger.debug("Transformation of new config file failed after swap file was created, deleting it."); + if (!swapConfigFile.delete()) { + logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually."); + } + throw e; + } + } Review Comment: This set of try-catch blocks is nested deeply, it would be helpful to refactor to separate methods. ########## minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiCommandSender.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.BufferedReader; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketTimeoutException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.nifi.minifi.bootstrap.MiNiFiParameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MiNiFiCommandSender { + + private static final Logger LOGGER = LoggerFactory.getLogger(MiNiFiCommandSender.class); + private static final String PING_CMD = "PING"; + private static final int SOCKET_TIMEOUT = 10000; + private static final int CONNECTION_TIMEOUT = 10000; + + private final MiNiFiParameters miNiFiParameters; + private final ObjectMapper objectMapper; + + public MiNiFiCommandSender(MiNiFiParameters miNiFiParameters, ObjectMapper objectMapper) { + this.miNiFiParameters = miNiFiParameters; + this.objectMapper = objectMapper; + } + + public Optional<String> sendCommand(String cmd, Integer port, String... extraParams) throws IOException { + Optional<String> response = Optional.empty(); + + if (port == null) { + LOGGER.info("Apache MiNiFi is not currently running"); + return response; + } + + try (Socket socket = new Socket()) { + LOGGER.debug("Connecting to MiNiFi instance"); + socket.setSoTimeout(SOCKET_TIMEOUT); + socket.connect(new InetSocketAddress("localhost", port), CONNECTION_TIMEOUT); + LOGGER.debug("Established connection to MiNiFi instance."); + + LOGGER.debug("Sending {} Command to port {}", cmd, port); + + String responseString; + try (OutputStream out = socket.getOutputStream()) { + out.write(getCommand(cmd, extraParams)); + out.flush(); + responseString = readResponse(socket); + } + + LOGGER.debug("Received response to {} command: {}", cmd, responseString); + response = Optional.of(responseString); + } catch (EOFException | SocketTimeoutException e) { + String message = "Failed to get response for " + cmd + " Potentially due to the process currently being down (restarting or otherwise)"; + throw new RuntimeException(message); + } + return response; + } + + <T> T sendCommandForObject(String cmd, Integer port, Class<T> clazz, String... extraParams) throws IOException { + return sendCommand(cmd, port, extraParams) + .map(response -> deserialize(cmd, response, clazz)) + .orElse(null); + } + + private String readResponse(Socket socket) throws IOException { + StringBuilder sb = new StringBuilder(); + int numLines = 0; + try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + if (numLines++ > 0) { + sb.append("\n"); + } + sb.append(line); + } + } + + return sb.toString().trim(); + } + + private byte[] getCommand(String cmd, String... args) { + String argsString = Arrays.stream(args).collect(Collectors.joining(" ")); + String commandWithArgs = cmd + " " + miNiFiParameters.getSecretKey() + (args.length > 0 ? " " : "") + argsString + "\n"; + return commandWithArgs.getBytes(StandardCharsets.UTF_8); + } + + private <T> T deserialize(String cmd, String obj, Class<T> clazz) { + T response; + try { + response = objectMapper.readValue(obj, clazz); + } catch (JsonProcessingException e) { + String message = "Failed to deserialize " + cmd + " response"; + LOGGER.error(message); + throw new RuntimeException(message); Review Comment: The `JsonProcessingException` should be passed as the cause. ```suggestion throw new RuntimeException(message, e); ``` ########## minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf: ########## @@ -129,3 +128,36 @@ java.arg.7=-Djava.security.egd=file:/dev/urandom #Set headless mode by default java.arg.14=-Djava.awt.headless=true + +# MiNiFi Command & Control Configuration +# C2 Properties +# Enabling C2 Uncomment each of the following options +#c2.enable=true +## define protocol parameters +#c2.rest.url= +#c2.rest.url.ack= +## c2 timeouts +#c2.rest.connectionTimeout=5 sec +#c2.rest.readTimeout=5 sec +#c2.rest.callTimeout=10 sec +## heartbeat in milliseconds +#c2.agent.heartbeat.period=5000 +## define parameters about your agent +#c2.agent.class= +#c2.config.directory=./conf +#c2.runtime.manifest.identifier=minifi +#c2.runtime.type=minifi-java +# Optional. Defaults to a hardware based unique identifier +#c2.agent.identifier= +## Define TLS security properties for C2 communications +#c2.security.truststore.location= +#c2.security.truststore.password= +#c2.security.truststore.type=JKS +#c2.security.keystore.location= +#c2.security.keystore.password= +#c2.security.keystore.type=JKS +#c2.security.need.client.auth=true Review Comment: Is this property used? ########## minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.service; + +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY; +import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY; +import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY; +import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY; +import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Properties; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.minifi.bootstrap.RunMiNiFi; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; +import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream; +import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer; +import org.apache.nifi.minifi.commons.schema.ConfigSchema; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; +import org.slf4j.Logger; + +public class MiNiFiConfigurationChangeListener implements ConfigurationChangeListener { + + private final RunMiNiFi runner; + private final Logger logger; + private final BootstrapFileProvider bootstrapFileProvider; + + private static final ReentrantLock handlingLock = new ReentrantLock(); + + public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider) { + this.runner = runner; + this.logger = logger; + this.bootstrapFileProvider = bootstrapFileProvider; + } + + @Override + public void handleChange(InputStream configInputStream) throws ConfigurationChangeException { + logger.info("Received notification of a change"); + + if (!handlingLock.tryLock()) { + throw new ConfigurationChangeException("Instance is already handling another change"); + } + // Store the incoming stream as a byte array to be shared among components that need it + try(ByteArrayOutputStream bufferedConfigOs = new ByteArrayOutputStream()) { + + Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties(); + File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)); + + byte[] copyArray = new byte[1024]; + int available; + while ((available = configInputStream.read(copyArray)) > 0) { + bufferedConfigOs.write(copyArray, 0, available); + } Review Comment: It looks like this approach could be replaced with `IOUtils.copy()` ########## minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.service; + +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY; +import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY; +import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY; +import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY; +import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Properties; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.minifi.bootstrap.RunMiNiFi; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; +import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream; +import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer; +import org.apache.nifi.minifi.commons.schema.ConfigSchema; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; +import org.slf4j.Logger; + +public class MiNiFiConfigurationChangeListener implements ConfigurationChangeListener { + + private final RunMiNiFi runner; + private final Logger logger; + private final BootstrapFileProvider bootstrapFileProvider; + + private static final ReentrantLock handlingLock = new ReentrantLock(); + + public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider) { + this.runner = runner; + this.logger = logger; + this.bootstrapFileProvider = bootstrapFileProvider; + } + + @Override + public void handleChange(InputStream configInputStream) throws ConfigurationChangeException { + logger.info("Received notification of a change"); + + if (!handlingLock.tryLock()) { + throw new ConfigurationChangeException("Instance is already handling another change"); + } + // Store the incoming stream as a byte array to be shared among components that need it + try(ByteArrayOutputStream bufferedConfigOs = new ByteArrayOutputStream()) { + + Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties(); + File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)); + + byte[] copyArray = new byte[1024]; + int available; + while ((available = configInputStream.read(copyArray)) > 0) { + bufferedConfigOs.write(copyArray, 0, available); + } + + File swapConfigFile = bootstrapFileProvider.getSwapFile(); + logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath()); + + try (FileInputStream configFileInputStream = new FileInputStream(configFile)) { + Files.copy(configFileInputStream, swapConfigFile.toPath(), REPLACE_EXISTING); + } + + persistBackNonFlowSectionsFromOriginalSchema(bufferedConfigOs.toByteArray(), bootstrapProperties, configFile); + + // Create an input stream to feed to the config transformer + try (FileInputStream newConfigIs = new FileInputStream(configFile)) { + + try { + String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY); + + try { + logger.info("Performing transformation for input and saving outputs to {}", confDir); + ByteBuffer tempConfigFile = generateConfigFiles(newConfigIs, confDir, bootstrapFileProvider.getBootstrapProperties()); + runner.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer()); + + try { + logger.info("Reloading instance with new configuration"); + restartInstance(); + } catch (Exception e) { + logger.debug("Transformation of new config file failed after transformation into Flow.xml and nifi.properties, reverting."); + try (FileInputStream swapConfigFileStream = new FileInputStream(swapConfigFile)) { + ByteBuffer resetConfigFile = generateConfigFiles(swapConfigFileStream, confDir, bootstrapFileProvider.getBootstrapProperties()); + runner.getConfigFileReference().set(resetConfigFile.asReadOnlyBuffer()); + } + throw e; + } + } catch (Exception e) { + logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting."); + try (FileInputStream swapConfigFileStream = new FileInputStream(swapConfigFile)) { + Files.copy(swapConfigFileStream, configFile.toPath(), REPLACE_EXISTING); + } + throw e; + } + } catch (Exception e) { + logger.debug("Transformation of new config file failed after swap file was created, deleting it."); + if (!swapConfigFile.delete()) { + logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually."); + } + throw e; + } + } + } catch (ConfigurationChangeException e){ + logger.error("Unable to carry out reloading of configuration on receipt of notification event", e); + throw e; + } catch (IOException ioe) { + logger.error("Unable to carry out reloading of configuration on receipt of notification event", ioe); + throw new ConfigurationChangeException("Unable to perform reload of received configuration change", ioe); Review Comment: Is there a reason for logging the error and throw the exception? Shouldn't the exception be caught and logged by the caller? This approach seems like it would generate multiple log messages for the same problem. ########## c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.c2.client; + +import javax.net.ssl.HostnameVerifier; + +/** + * Configuration for a C2 Client. + */ +public class C2ClientConfig { + + private final String c2Url; + private final String c2AckUrl; + private final String agentClass; + private final String agentIdentifier; + private final String confDirectory; + private final String runtimeManifestIdentifier; + private final String runtimeType; + private final long heartbeatPeriod; + private final String keystoreFilename; + private final String keystorePass; + private final String keyPass; + private final String keystoreType; + private final String truststoreFilename; + private final String truststorePass; + private final String truststoreType; + private final HostnameVerifier hostnameVerifier; Review Comment: @mattyb149 or @bejancsaba Can this property be removed? ########## minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.service; + +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY; +import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY; +import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.OVERRIDE_SECURITY; +import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PULL_HTTP_BASE_KEY; +import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Properties; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.minifi.bootstrap.RunMiNiFi; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException; +import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener; +import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream; +import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer; +import org.apache.nifi.minifi.commons.schema.ConfigSchema; +import org.apache.nifi.minifi.commons.schema.common.ConvertableSchema; +import org.apache.nifi.minifi.commons.schema.serialization.SchemaLoader; +import org.slf4j.Logger; + +public class MiNiFiConfigurationChangeListener implements ConfigurationChangeListener { + + private final RunMiNiFi runner; + private final Logger logger; + private final BootstrapFileProvider bootstrapFileProvider; + + private static final ReentrantLock handlingLock = new ReentrantLock(); + + public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider) { + this.runner = runner; + this.logger = logger; + this.bootstrapFileProvider = bootstrapFileProvider; + } + + @Override + public void handleChange(InputStream configInputStream) throws ConfigurationChangeException { + logger.info("Received notification of a change"); + + if (!handlingLock.tryLock()) { + throw new ConfigurationChangeException("Instance is already handling another change"); + } + // Store the incoming stream as a byte array to be shared among components that need it + try(ByteArrayOutputStream bufferedConfigOs = new ByteArrayOutputStream()) { + + Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties(); + File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)); + + byte[] copyArray = new byte[1024]; + int available; + while ((available = configInputStream.read(copyArray)) > 0) { + bufferedConfigOs.write(copyArray, 0, available); + } + + File swapConfigFile = bootstrapFileProvider.getSwapFile(); + logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath()); + + try (FileInputStream configFileInputStream = new FileInputStream(configFile)) { + Files.copy(configFileInputStream, swapConfigFile.toPath(), REPLACE_EXISTING); + } + + persistBackNonFlowSectionsFromOriginalSchema(bufferedConfigOs.toByteArray(), bootstrapProperties, configFile); + + // Create an input stream to feed to the config transformer + try (FileInputStream newConfigIs = new FileInputStream(configFile)) { + + try { + String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY); + + try { + logger.info("Performing transformation for input and saving outputs to {}", confDir); + ByteBuffer tempConfigFile = generateConfigFiles(newConfigIs, confDir, bootstrapFileProvider.getBootstrapProperties()); + runner.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer()); + + try { + logger.info("Reloading instance with new configuration"); + restartInstance(); + } catch (Exception e) { + logger.debug("Transformation of new config file failed after transformation into Flow.xml and nifi.properties, reverting."); + try (FileInputStream swapConfigFileStream = new FileInputStream(swapConfigFile)) { + ByteBuffer resetConfigFile = generateConfigFiles(swapConfigFileStream, confDir, bootstrapFileProvider.getBootstrapProperties()); + runner.getConfigFileReference().set(resetConfigFile.asReadOnlyBuffer()); + } + throw e; + } + } catch (Exception e) { + logger.debug("Transformation of new config file failed after replacing original with the swap file, reverting."); + try (FileInputStream swapConfigFileStream = new FileInputStream(swapConfigFile)) { + Files.copy(swapConfigFileStream, configFile.toPath(), REPLACE_EXISTING); + } + throw e; + } + } catch (Exception e) { + logger.debug("Transformation of new config file failed after swap file was created, deleting it."); + if (!swapConfigFile.delete()) { + logger.warn("The swap file failed to delete after a failed handling of a change. It should be cleaned up manually."); + } + throw e; + } + } + } catch (ConfigurationChangeException e){ + logger.error("Unable to carry out reloading of configuration on receipt of notification event", e); + throw e; + } catch (IOException ioe) { + logger.error("Unable to carry out reloading of configuration on receipt of notification event", ioe); + throw new ConfigurationChangeException("Unable to perform reload of received configuration change", ioe); + } finally { + IOUtils.closeQuietly(configInputStream); + handlingLock.unlock(); + } + } + + @Override + public String getDescriptor() { + return "MiNiFiConfigurationChangeListener"; + } + + private void restartInstance() throws IOException { + try { + runner.reload(); + } catch (IOException e) { + throw new IOException("Unable to successfully restart MiNiFi instance after configuration change.", e); + } + } + + private void persistBackNonFlowSectionsFromOriginalSchema(byte[] newSchema, Properties bootstrapProperties, File configFile) { + try { + ConvertableSchema<ConfigSchema> schemaNew = ConfigTransformer + .throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new ByteArrayInputStream(newSchema))); + ConfigSchema configSchemaNew = ConfigTransformer.throwIfInvalid(schemaNew.convert()); + ConvertableSchema<ConfigSchema> schemaOld = ConfigTransformer + .throwIfInvalid(SchemaLoader.loadConvertableSchemaFromYaml(new ByteBufferInputStream(runner.getConfigFileReference().get().duplicate()))); + ConfigSchema configSchemaOld = ConfigTransformer.throwIfInvalid(schemaOld.convert()); + + configSchemaNew.setNifiPropertiesOverrides(configSchemaOld.getNifiPropertiesOverrides()); + + if (!overrideCoreProperties(bootstrapProperties)) { + logger.debug("Preserving previous core properties..."); + configSchemaNew.setCoreProperties(configSchemaOld.getCoreProperties()); + } + + if (!overrideSecurityProperties(bootstrapProperties)) { + logger.debug("Preserving previous security properties..."); + configSchemaNew.setSecurityProperties(configSchemaOld.getSecurityProperties()); + } + + logger.debug("Persisting changes to {}", configFile.getAbsolutePath()); + SchemaLoader.toYaml(configSchemaNew, new FileWriter(configFile)); + } catch (Exception e) { + logger.error("Loading the old and the new schema for merging was not successful"); Review Comment: The exception should be logged: ```suggestion logger.error("Loading the old and the new schema for merging was not successful", e); ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NiFiProperties.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.c2; + +import java.util.concurrent.TimeUnit; + +public class C2NiFiProperties { + + public static final String C2_PREFIX = "c2."; + + public static final String C2_ENABLE_KEY = C2_PREFIX + "enable"; + public static final String C2_AGENT_PROTOCOL_KEY = C2_PREFIX + "agent.protocol.class"; + public static final String C2_COAP_HOST_KEY = C2_PREFIX + "agent.coap.host"; + public static final String C2_COAP_PORT_KEY = C2_PREFIX + "agent.coap.port"; + public static final String C2_CONFIG_DIRECTORY_KEY = C2_PREFIX + "config.directory"; + public static final String C2_RUNTIME_MANIFEST_IDENTIFIER_KEY = C2_PREFIX + "runtime.manifest.identifier"; + public static final String C2_RUNTIME_TYPE_KEY = C2_PREFIX + "runtime.type"; + public static final String C2_REST_URL_KEY = C2_PREFIX + "rest.url"; + public static final String C2_REST_URL_ACK_KEY = C2_PREFIX + "rest.url.ack"; + public static final String C2_ROOT_CLASSES_KEY = C2_PREFIX + "root.classes"; + public static final String C2_AGENT_HEARTBEAT_PERIOD_KEY = C2_PREFIX + "agent.heartbeat.period"; + public static final String C2_CONNECTION_TIMEOUT = C2_PREFIX + "rest.connectionTimeout"; + public static final String C2_READ_TIMEOUT = C2_PREFIX + "rest.readTimeout"; + public static final String C2_CALL_TIMEOUT = C2_PREFIX + "rest.callTimeout"; + public static final String C2_AGENT_CLASS_KEY = C2_PREFIX + "agent.class"; + public static final String C2_AGENT_IDENTIFIER_KEY = C2_PREFIX + "agent.identifier"; + + public static final String C2_ROOT_CLASS_DEFINITIONS_KEY = C2_PREFIX + "root.class.definitions"; + public static final String C2_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.name"; + public static final String C2_METRICS_METRICS_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics"; + public static final String C2_METRICS_METRICS_TYPED_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.typedmetrics.name"; + public static final String C2_METRICS_METRICS_QUEUED_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.queuemetrics.name"; + public static final String C2_METRICS_METRICS_QUEUE_METRICS_CLASSES_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.queuemetrics.classes"; + public static final String C2_METRICS_METRICS_TYPED_METRICS_CLASSES_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.typedmetrics.classes"; + public static final String C2_METRICS_METRICS_PROCESSOR_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.processorMetrics.name"; + public static final String C2_METRICS_METRICS_PROCESSOR_METRICS_CLASSES_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.processorMetrics.classes"; + + /* C2 Client Security Properties */ + private static final String C2_REST_SECURITY_BASE_KEY = C2_PREFIX + "security"; + public static final String TRUSTSTORE_LOCATION_KEY = C2_REST_SECURITY_BASE_KEY + ".truststore.location"; + public static final String TRUSTSTORE_PASSWORD_KEY = C2_REST_SECURITY_BASE_KEY + ".truststore.password"; + public static final String TRUSTSTORE_TYPE_KEY = C2_REST_SECURITY_BASE_KEY + ".truststore.type"; + public static final String KEYSTORE_LOCATION_KEY = C2_REST_SECURITY_BASE_KEY + ".keystore.location"; + public static final String KEYSTORE_PASSWORD_KEY = C2_REST_SECURITY_BASE_KEY + ".keystore.password"; + public static final String KEYSTORE_TYPE_KEY = C2_REST_SECURITY_BASE_KEY + ".keystore.type"; + public static final String NEED_CLIENT_AUTH_KEY = C2_REST_SECURITY_BASE_KEY + ".need.client.auth"; Review Comment: Is this property still applicable? ########## minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/PeriodicStatusReporterManager.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.service; + +import static org.apache.nifi.minifi.commons.schema.common.BootstrapPropertyKeys.STATUS_REPORTER_COMPONENTS_KEY; + +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import org.apache.nifi.minifi.bootstrap.MiNiFiParameters; +import org.apache.nifi.minifi.bootstrap.MiNiFiStatus; +import org.apache.nifi.minifi.bootstrap.QueryableStatusAggregator; +import org.apache.nifi.minifi.bootstrap.status.PeriodicStatusReporter; +import org.apache.nifi.minifi.commons.status.FlowStatusReport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PeriodicStatusReporterManager implements QueryableStatusAggregator { + private static final Logger LOGGER = LoggerFactory.getLogger(PeriodicStatusReporterManager.class); + private static final String FLOW_STATUS_REPORT_CMD = "FLOW_STATUS_REPORT"; + + private final Properties bootstrapProperties; + private final MiNiFiStatusProvider miNiFiStatusProvider; + private final MiNiFiCommandSender miNiFiCommandSender; + private final MiNiFiParameters miNiFiParameters; + + private Set<PeriodicStatusReporter> periodicStatusReporters = Collections.emptySet(); + + public PeriodicStatusReporterManager(Properties bootstrapProperties, MiNiFiStatusProvider miNiFiStatusProvider, MiNiFiCommandSender miNiFiCommandSender, + MiNiFiParameters miNiFiParameters) { + this.bootstrapProperties = bootstrapProperties; + this.miNiFiStatusProvider = miNiFiStatusProvider; + this.miNiFiCommandSender = miNiFiCommandSender; + this.miNiFiParameters = miNiFiParameters; + } + + public void startPeriodicNotifiers() { + periodicStatusReporters = initializePeriodicNotifiers(); + + for (PeriodicStatusReporter periodicStatusReporter: periodicStatusReporters) { + periodicStatusReporter.start(); + LOGGER.debug("Started {} notifier", periodicStatusReporter.getClass().getCanonicalName()); + } + } + + public void shutdownPeriodicStatusReporters() { + LOGGER.debug("Initiating shutdown of bootstrap periodic status reporters..."); + for (PeriodicStatusReporter periodicStatusReporter : periodicStatusReporters) { + try { + periodicStatusReporter.stop(); + } catch (Exception exception) { + LOGGER.error("Could not successfully stop periodic status reporter " + periodicStatusReporter.getClass() + " due to ", exception); + } + } + } + + public FlowStatusReport statusReport(String statusRequest) { + MiNiFiStatus status = miNiFiStatusProvider.getStatus(miNiFiParameters.getMiNiFiPort(), miNiFiParameters.getMinifiPid()); + + List<String> problemsGeneratingReport = new LinkedList<>(); + if (!status.isProcessRunning()) { + problemsGeneratingReport.add("MiNiFi process is not running"); + } + + if (!status.isRespondingToPing()) { + problemsGeneratingReport.add("MiNiFi process is not responding to pings"); + } + + if (!problemsGeneratingReport.isEmpty()) { + FlowStatusReport flowStatusReport = new FlowStatusReport(); + flowStatusReport.setErrorsGeneratingReport(problemsGeneratingReport); + return flowStatusReport; + } + + return getFlowStatusReport(statusRequest, status.getPort()); + } + + private Set<PeriodicStatusReporter> initializePeriodicNotifiers() { + LOGGER.debug("Initiating bootstrap periodic status reporters..."); + Set<PeriodicStatusReporter> statusReporters = new HashSet<>(); + + String reportersCsv = bootstrapProperties.getProperty(STATUS_REPORTER_COMPONENTS_KEY); + + if (reportersCsv != null && !reportersCsv.isEmpty()) { + for (String reporterClassname : reportersCsv.split(",")) { + try { + Class<?> reporterClass = Class.forName(reporterClassname); + PeriodicStatusReporter reporter = (PeriodicStatusReporter) reporterClass.newInstance(); + reporter.initialize(bootstrapProperties, this); + statusReporters.add(reporter); + LOGGER.debug("Initialized {} notifier", reporterClass.getCanonicalName()); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new RuntimeException("Issue instantiating notifier " + reporterClassname, e); + } + } + } + return statusReporters; + } + + private FlowStatusReport getFlowStatusReport(String statusRequest, int port) { + FlowStatusReport flowStatusReport; + try { + flowStatusReport = miNiFiCommandSender.sendCommandForObject(FLOW_STATUS_REPORT_CMD, port, FlowStatusReport.class, statusRequest); + } catch (Exception e) { + flowStatusReport = new FlowStatusReport(); + String message = "Failed to get status report from MiNiFi due to:" + e.getMessage(); + flowStatusReport.setErrorsGeneratingReport(Collections.singletonList(message)); + LOGGER.error(message); Review Comment: Recommend logging exception for tracking: ```suggestion LOGGER.error(message, e); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
