briansolo1985 commented on code in PR #7344:
URL: https://github.com/apache/nifi/pull/7344#discussion_r1288560897
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider
bootstrapFileProvider) {
* @throws IOException throws IOException if any of the configuration file
read fails
*/
public List<String> getMiNiFiExecCommand(int listenPort, File workingDir)
throws IOException {
- Properties props = bootstrapFileProvider.getBootstrapProperties();
- File confDir = getFile(props.getProperty(CONF_DIR_KEY,
DEFAULT_CONF_DIR).trim(), workingDir);
- File libDir = getFile(props.getProperty("lib.dir",
DEFAULT_LIB_DIR).trim(), workingDir);
+ Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
+
+ File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY,
DEFAULT_CONF_DIR).trim(), workingDir);
+ File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY,
DEFAULT_LIB_DIR).trim(), workingDir);
+
String minifiLogDir = System.getProperty(LOG_DIR,
DEFAULT_LOG_DIR).trim();
String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME,
DEFAULT_APP_LOG_FILE_NAME).trim();
String minifiAppLogFileExtension =
System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
String minifiBootstrapLogFileName =
System.getProperty(BOOTSTRAP_LOG_FILE_NAME,
DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
String minifiBootstrapLogFileExtension =
System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION,
DEFAULT_LOG_FILE_EXTENSION).trim();
- List<String> cmd = new ArrayList<>();
- cmd.add(getJavaCommand(props));
- cmd.add("-classpath");
- cmd.add(buildClassPath(props, confDir, libDir));
- cmd.addAll(getJavaAdditionalArgs(props));
- cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props,
confDir));
- cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
- cmd.add("-Dapp=MiNiFi");
- cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
- cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
- cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" +
minifiAppLogFileExtension);
- cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" +
minifiBootstrapLogFileName);
- cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" +
minifiBootstrapLogFileExtension);
- cmd.add("org.apache.nifi.minifi.MiNiFi");
-
- return cmd;
+ return List.of(
+ getJavaCommand(bootstrapProperties),
+ "-classpath",
+ buildClassPath(confDir, libDir),
+ getJavaAdditionalArgs(bootstrapProperties),
+ "-Dnifi.properties.file.path=" +
getMiNiFiPropertiesPath(bootstrapProperties, confDir),
+ "-Dnifi.bootstrap.listen.port=" + listenPort,
+ "-Dapp=MiNiFi",
+ "-D" + LOG_DIR + "=" + minifiLogDir,
+ "-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName,
+ "-D" + APP_LOG_FILE_EXTENSION + "=" + minifiAppLogFileExtension,
+ "-D" + BOOTSTRAP_LOG_FILE_NAME + "=" + minifiBootstrapLogFileName,
+ "-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" +
minifiBootstrapLogFileExtension,
+ "org.apache.nifi.minifi.MiNiFi");
+ }
+
+ private File getFile(String filename, File workingDir) {
+ File file = new File(filename);
+ return file.isAbsolute() ? file : new File(workingDir,
filename).getAbsoluteFile();
}
- private String getJavaCommand(Properties props) {
- String javaCmd = props.getProperty("java");
- if (javaCmd == null) {
- javaCmd = DEFAULT_JAVA_CMD;
- }
- if (javaCmd.equals(DEFAULT_JAVA_CMD)) {
- Optional.ofNullable(System.getenv("JAVA_HOME"))
- .map(javaHome -> getJavaCommandBasedOnExtension(javaHome,
WINDOWS_FILE_EXTENSION)
- .orElseGet(() -> getJavaCommandBasedOnExtension(javaHome,
"").orElse(DEFAULT_JAVA_CMD)));
- }
- return javaCmd;
+ private String getJavaCommand(Properties bootstrapProperties) {
+ String javaCommand = bootstrapProperties.getProperty(JAVA_COMMAND_KEY,
DEFAULT_JAVA_CMD);
+ return javaCommand.equals(DEFAULT_JAVA_CMD)
+ ? ofNullable(System.getenv(JAVA_HOME_ENVIRONMENT_VARIABLE))
+ .flatMap(javaHome ->
+ getJavaCommandBasedOnExtension(javaHome,
WINDOWS_FILE_EXTENSION)
+ .or(() -> getJavaCommandBasedOnExtension(javaHome,
LINUX_FILE_EXTENSION)))
+ .orElse(DEFAULT_JAVA_CMD)
+ : javaCommand;
}
private Optional<String> getJavaCommandBasedOnExtension(String javaHome,
String extension) {
- String javaCmd = null;
- File javaFile = new File(javaHome + File.separatorChar + "bin" +
File.separatorChar + "java" + extension);
- if (javaFile.exists() && javaFile.canExecute()) {
- javaCmd = javaFile.getAbsolutePath();
- }
- return Optional.ofNullable(javaCmd);
+ return Optional.of(new File(javaHome + File.separatorChar + "bin" +
File.separatorChar + "java" + extension))
Review Comment:
Eliminated the hardcoded strings
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider
bootstrapFileProvider) {
* @throws IOException throws IOException if any of the configuration file
read fails
*/
public List<String> getMiNiFiExecCommand(int listenPort, File workingDir)
throws IOException {
- Properties props = bootstrapFileProvider.getBootstrapProperties();
- File confDir = getFile(props.getProperty(CONF_DIR_KEY,
DEFAULT_CONF_DIR).trim(), workingDir);
- File libDir = getFile(props.getProperty("lib.dir",
DEFAULT_LIB_DIR).trim(), workingDir);
+ Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
+
+ File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY,
DEFAULT_CONF_DIR).trim(), workingDir);
+ File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY,
DEFAULT_LIB_DIR).trim(), workingDir);
+
String minifiLogDir = System.getProperty(LOG_DIR,
DEFAULT_LOG_DIR).trim();
String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME,
DEFAULT_APP_LOG_FILE_NAME).trim();
String minifiAppLogFileExtension =
System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
String minifiBootstrapLogFileName =
System.getProperty(BOOTSTRAP_LOG_FILE_NAME,
DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
String minifiBootstrapLogFileExtension =
System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION,
DEFAULT_LOG_FILE_EXTENSION).trim();
- List<String> cmd = new ArrayList<>();
- cmd.add(getJavaCommand(props));
- cmd.add("-classpath");
- cmd.add(buildClassPath(props, confDir, libDir));
- cmd.addAll(getJavaAdditionalArgs(props));
- cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props,
confDir));
- cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
- cmd.add("-Dapp=MiNiFi");
- cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
- cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
- cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" +
minifiAppLogFileExtension);
- cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" +
minifiBootstrapLogFileName);
- cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" +
minifiBootstrapLogFileExtension);
- cmd.add("org.apache.nifi.minifi.MiNiFi");
-
- return cmd;
+ return List.of(
+ getJavaCommand(bootstrapProperties),
+ "-classpath",
+ buildClassPath(confDir, libDir),
+ getJavaAdditionalArgs(bootstrapProperties),
+ "-Dnifi.properties.file.path=" +
getMiNiFiPropertiesPath(bootstrapProperties, confDir),
+ "-Dnifi.bootstrap.listen.port=" + listenPort,
+ "-Dapp=MiNiFi",
+ "-D" + LOG_DIR + "=" + minifiLogDir,
Review Comment:
Refactored to make this more readable
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java:
##########
@@ -231,15 +215,21 @@ private void createSecureConnector(Properties properties)
{
logger.info("HTTPS Connector added for Host [{}] and Port [{}]",
https.getHost(), https.getPort());
}
- protected void setDifferentiator(Differentiator<ByteBuffer>
differentiator) {
+ private Supplier<IllegalArgumentException>
unableToFindDifferentiatorExceptionSupplier(String differentiator) {
Review Comment:
I left is as it is for the time being
##########
minifi/minifi-c2/minifi-c2-assembly/src/main/resources/files/raspi3/config.test.json.v1:
##########
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+ "encodingVersion": {
+ "majorVersion": 2,
+ "minorVersion": 0
+ },
+ "maxTimerDrivenThreadCount": 10,
+ "maxEventDrivenThreadCount": 1,
+ "registries": [],
+ "parameterContexts": [],
+ "parameterProviders": [],
+ "controllerServices": [],
+ "reportingTasks": [],
+ "templates": [],
+ "rootGroup": {
+ "identifier": "c1b4e586-2011-3f81-a11e-8d669f084d1c",
+ "instanceIdentifier": "29db3dbc-0188-1000-7025-4cab8b52d278",
+ "name": "NiFi Flow",
Review Comment:
Updated
##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGeneratorTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.Boolean.TRUE;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.SPACE;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.DEFAULT_SENSITIVE_PROPERTIES_ENCODING_ALGORITHM;
+import static
org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.MINIFI_TO_NIFI_PROPERTY_MAPPING;
+import static
org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MiNiFiPropertiesGeneratorTest {
+
+ @TempDir
+ private Path tempDir;
+
+ private Path configDirectory;
+ private Path bootstrapPropertiesFile;
+ private Path minifiPropertiesFile;
+
+ private MiNiFiPropertiesGenerator testPropertiesGenerator;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ configDirectory = tempDir.toAbsolutePath().resolve("conf");
+ Files.createDirectories(configDirectory);
+ bootstrapPropertiesFile = configDirectory.resolve("bootstrap.conf");
+ minifiPropertiesFile = configDirectory.resolve("minifi.properties");
+
+ testPropertiesGenerator = new MiNiFiPropertiesGenerator();
+ }
+
+ @Test
+ public void testGenerateDefaultNiFiProperties() throws
ConfigurationChangeException {
+ // given
+ Properties bootstrapProperties = createBootstrapProperties(Map.of());
+
+ // when
+
testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(),
bootstrapProperties);
+
+ // then
+ List<String> expectedMiNiFiProperties =
NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS.stream()
+ .map(triplet -> triplet.getLeft() + "=" + triplet.getMiddle())
+ .collect(toList());
+ List<String> resultMiNiFiProperties = loadMiNiFiProperties().entrySet()
+ .stream()
+ .map(entry -> entry.getKey() + "=" + entry.getValue())
+ .collect(toList());
+
assertTrue(resultMiNiFiProperties.containsAll(expectedMiNiFiProperties));
+ }
+
+ @Test
+ public void testMiNiFiPropertiesMappedToAppropriateNiFiProperties() throws
ConfigurationChangeException {
+ // given
+ Properties bootstrapProperties = createBootstrapProperties(List.of(
+ MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey(),
+ MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE.getKey(),
+ MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE.getKey(),
+ MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD.getKey(),
+ MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD.getKey(),
+ MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE.getKey(),
+ MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE.getKey(),
+
MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD.getKey())
+ .stream()
+ .collect(toMap(Function.identity(), __ -> randomUUID().toString()))
+ );
+
+ // when
+
testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(),
bootstrapProperties);
+
+ // then
+ Properties miNiFiProperties = loadMiNiFiProperties();
+ MINIFI_TO_NIFI_PROPERTY_MAPPING.entrySet().stream()
+ .allMatch(entry ->
Objects.equals(bootstrapProperties.getProperty(entry.getKey()),
miNiFiProperties.getProperty(entry.getValue())));
Review Comment:
Good point
##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/FlowEnrichService.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.api;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.parseBoolean;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Map.entry;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE;
+import static org.apache.nifi.flow.ComponentType.REPORTING_TASK;
+import static org.apache.nifi.flow.ScheduledState.ENABLED;
+import static org.apache.nifi.flow.ScheduledState.RUNNING;
+import static org.apache.nifi.logging.LogLevel.WARN;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_USE_PARENT_SSL;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMPRESS_EVENTS;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INSTANCE_URL;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_SSL_PROTOCOL;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.serialization.FlowSerializationException;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.properties.ReadableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlowEnrichService {
+
+ static final String COMMON_SSL_CONTEXT_SERVICE_NAME =
"SSL-Context-Service";
+ static final String DEFAULT_SSL_CONTEXT_SERVICE_NAME = "SSL Context
Service";
+ static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME =
"Site-To-Site-Provenance-Reporting";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlowEnrichService.class);
+
+ private static final String NIFI_BUNDLE_GROUP = "org.apache.nifi";
+ private static final String STANDARD_RESTRICTED_SSL_CONTEXT_SERVICE =
"org.apache.nifi.ssl.StandardRestrictedSSLContextService";
+ private static final String RESTRICTED_SSL_CONTEXT_SERVICE_API =
"org.apache.nifi.ssl.RestrictedSSLContextService";
+ private static final String SSL_CONTEXT_SERVICE_API =
"org.apache.nifi.ssl.SSLContextService";
+ private static final String SSL_CONTEXT_SERVICE_NAR =
"nifi-ssl-context-service-nar";
+ private static final String STANDARD_SERVICES_API_NAR_ARTIFACT =
"nifi-standard-services-api-nar";
+ private static final String SITE_TO_SITE_PROVENANCE_REPORTING_TASK =
"org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask";
+ private static final String SITE_TO_SITE_REPORTING_NAR_ARTIFACT =
"nifi-site-to-site-reporting-nar";
+ private static final String PROVENANCE_REPORTING_TASK_PROTOCOL = "HTTP";
+ private static final String PROVENANCE_REPORTING_TASK_BEGINNING_OF_STREAM
= "beginning-of-stream";
+
+ private final ReadableProperties minifiProperties;
+
+ public FlowEnrichService(ReadableProperties minifiProperties) {
+ this.minifiProperties = minifiProperties;
+ }
+
+ public byte[] enrichFlow(byte[] flowCandidate) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Enriching flow with content: \n{}", new
String(flowCandidate, UTF_8));
+ }
+
+ VersionedDataflow versionedDataflow =
parseVersionedDataflow(flowCandidate);
+
+ Optional<Integer> maxConcurrentThreads =
ofNullable(minifiProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS.getKey()))
+ .map(Integer::parseInt);
+
maxConcurrentThreads.ifPresent(versionedDataflow::setMaxTimerDrivenThreadCount);
+
maxConcurrentThreads.ifPresent(versionedDataflow::setMaxEventDrivenThreadCount);
+
+ VersionedProcessGroup rootGroup = versionedDataflow.getRootGroup();
+ if (rootGroup.getIdentifier() == null) {
+ rootGroup.setIdentifier(UUID.randomUUID().toString());
+ }
+ if (rootGroup.getInstanceIdentifier() == null) {
+ rootGroup.setInstanceIdentifier(UUID.randomUUID().toString());
+ }
+
+ Optional<VersionedControllerService> commonSslControllerService =
createCommonSslControllerService();
+ commonSslControllerService
+ .ifPresent(sslControllerService -> {
+ List<VersionedControllerService> currentControllerServices =
ofNullable(versionedDataflow.getControllerServices()).orElseGet(ArrayList::new);
+ currentControllerServices.add(sslControllerService);
+
versionedDataflow.setControllerServices(currentControllerServices);
+ });
+
+ commonSslControllerService
+ .filter(__ ->
parseBoolean(minifiProperties.getProperty(NIFI_MINIFI_FLOW_USE_PARENT_SSL.getKey())))
+ .map(VersionedComponent::getInstanceIdentifier)
+ .ifPresent(commonSslControllerServiceInstanceId ->
overrideProcessorsSslControllerService(rootGroup,
commonSslControllerServiceInstanceId));
+
+ createProvenanceReportingTask(commonSslControllerService)
Review Comment:
Good suggestion, thank you
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+ public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+ private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+ private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT =
"original_raw_content".getBytes(UTF_8);
+ private static byte[] ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT =
"original_enriched_content".getBytes(UTF_8);
+ private static byte[] NEW_RAW_FLOW_CONFIG_CONTENT =
"new_raw_content".getBytes(UTF_8);
+ private static byte[] NEW_ENRICHED_FLOW_CONFIG_CONTENT =
"new_enriched_content".getBytes(UTF_8);
+
+ @TempDir
+ private File tempDir;
+
+ @Mock
+ private FlowController mockFlowController;
+ @Mock
+ private FlowService mockFlowService;
+ @Mock
+ private FlowEnrichService mockFlowEnrichService;
+ @Mock
+ private FlowManager mockFlowManager;
+ @Mock
+ private ProcessGroup mockProcessGroup;
+
+ private Path flowConfigurationFile;
+ private Path backupFlowConfigurationFile;
+ private Path rawFlowConfigurationFile;
+ private Path backupRawFlowConfigurationFile;
+
+ private UpdateConfigurationStrategy testUpdateConfiguratinStrategy;
Review Comment:
Fixed
##########
minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiPropertiesGeneratorTest.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.lang.Boolean.TRUE;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.SPACE;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.DEFAULT_SENSITIVE_PROPERTIES_ENCODING_ALGORITHM;
+import static
org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.MINIFI_TO_NIFI_PROPERTY_MAPPING;
+import static
org.apache.nifi.minifi.bootstrap.service.MiNiFiPropertiesGenerator.NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_APP_LOG_FILE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_FILE_PATH;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_BOOTSTRAP_LOG_FILE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.MINIFI_LOG_DIRECTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class MiNiFiPropertiesGeneratorTest {
+
+ @TempDir
+ private Path tempDir;
+
+ private Path configDirectory;
+ private Path bootstrapPropertiesFile;
+ private Path minifiPropertiesFile;
+
+ private MiNiFiPropertiesGenerator testPropertiesGenerator;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ configDirectory = tempDir.toAbsolutePath().resolve("conf");
+ Files.createDirectories(configDirectory);
+ bootstrapPropertiesFile = configDirectory.resolve("bootstrap.conf");
+ minifiPropertiesFile = configDirectory.resolve("minifi.properties");
+
+ testPropertiesGenerator = new MiNiFiPropertiesGenerator();
+ }
+
+ @Test
+ public void testGenerateDefaultNiFiProperties() throws
ConfigurationChangeException {
+ // given
+ Properties bootstrapProperties = createBootstrapProperties(Map.of());
+
+ // when
+
testPropertiesGenerator.generateMinifiProperties(configDirectory.toString(),
bootstrapProperties);
+
+ // then
+ List<String> expectedMiNiFiProperties =
NIFI_PROPERTIES_WITH_DEFAULT_VALUES_AND_COMMENTS.stream()
+ .map(triplet -> triplet.getLeft() + "=" + triplet.getMiddle())
+ .collect(toList());
+ List<String> resultMiNiFiProperties = loadMiNiFiProperties().entrySet()
+ .stream()
+ .map(entry -> entry.getKey() + "=" + entry.getValue())
+ .collect(toList());
+
assertTrue(resultMiNiFiProperties.containsAll(expectedMiNiFiProperties));
+ }
+
+ @Test
+ public void testMiNiFiPropertiesMappedToAppropriateNiFiProperties() throws
ConfigurationChangeException {
+ // given
+ Properties bootstrapProperties = createBootstrapProperties(List.of(
Review Comment:
Sure
##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/FlowEnrichService.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.api;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.parseBoolean;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Map.entry;
+import static java.util.Optional.empty;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE;
+import static org.apache.nifi.flow.ComponentType.REPORTING_TASK;
+import static org.apache.nifi.flow.ScheduledState.ENABLED;
+import static org.apache.nifi.flow.ScheduledState.RUNNING;
+import static org.apache.nifi.logging.LogLevel.WARN;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_USE_PARENT_SSL;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_COMPRESS_EVENTS;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_INSTANCE_URL;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_TYPE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEY_PASSWD;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_SSL_PROTOCOL;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.serialization.FlowSerializationException;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ControllerServiceAPI;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.properties.ReadableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlowEnrichService {
Review Comment:
Yes I didn't feel comfortable with putting this into an api module, but also
felt that creating a separate module just for this would be an overkill.
On second thought it seems to be reasonable though, so I moved to a
framework module.
##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ConfigSchemaToVersionedDataFlowTransformer.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Map.entry;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS;
+import static
org.apache.nifi.util.NiFiProperties.ADMINISTRATIVE_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.BORED_YIELD_DURATION;
+import static
org.apache.nifi.util.NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_ENABLED;
+import static
org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD;
+import static
org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE;
+import static
org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION;
+import static
org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC;
+import static
org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL;
+import static
org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION;
+import static
org.apache.nifi.util.NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_TIME;
+import static
org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_ROLLOVER_TIME;
+import static org.apache.nifi.util.NiFiProperties.QUEUE_SWAP_THRESHOLD;
+import static org.apache.nifi.util.NiFiProperties.VARIABLE_REGISTRY_PROPERTIES;
+import static org.apache.nifi.util.NiFiProperties.WRITE_DELAY_INTERVAL;
+
+import com.google.common.base.Splitter;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.minifi.toolkit.schema.ComponentStatusRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ConnectionSchema;
+import org.apache.nifi.minifi.toolkit.schema.ContentRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ControllerServiceSchema;
+import org.apache.nifi.minifi.toolkit.schema.CorePropertiesSchema;
+import org.apache.nifi.minifi.toolkit.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.FunnelSchema;
+import org.apache.nifi.minifi.toolkit.schema.PortSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessorSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProvenanceRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.RemotePortSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ReportingSchema;
+import org.apache.nifi.minifi.toolkit.schema.SwapSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.ExecutionNode;
+
+public class ConfigSchemaToVersionedDataFlowTransformer {
+
+ private static final String RPG_URLS_DELIMITER = ",";
+ private static final String DEFAULT_FLOW_FILE_EXPIRATION = "0 sec";
+ private static final String DEFAULT_BACK_PRESSURE_DATA_SIZE_THRESHOLD = "1
GB";
+ private static final String FLOW_FILE_CONCURRENCY = "UNBOUNDED";
+ private static final String FLOW_FILE_OUTBOUND_POLICY =
"STREAM_WHEN_AVAILABLE";
+ private static final long DEFAULT_BACK_PRESSURE_OBJECT_THRESHOLD = 10000L;
+ private static final Position DEFAULT_POSITION = new Position(0, 0);
Review Comment:
Yes, in the YAML source there was no position related info.
Position is a required attribute, but in case of MiNiFi it does not make too
much sense, as these properties are for the UI, and MiNiFi does not have it.
If for some reasons somebody loaded the flow into NiFi. they can still
manually order the processors.
If you don't mind I would keep it as is now
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiExecCommandProvider.java:
##########
@@ -62,116 +84,75 @@ public MiNiFiExecCommandProvider(BootstrapFileProvider
bootstrapFileProvider) {
* @throws IOException throws IOException if any of the configuration file
read fails
*/
public List<String> getMiNiFiExecCommand(int listenPort, File workingDir)
throws IOException {
- Properties props = bootstrapFileProvider.getBootstrapProperties();
- File confDir = getFile(props.getProperty(CONF_DIR_KEY,
DEFAULT_CONF_DIR).trim(), workingDir);
- File libDir = getFile(props.getProperty("lib.dir",
DEFAULT_LIB_DIR).trim(), workingDir);
+ Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
+
+ File confDir = getFile(bootstrapProperties.getProperty(CONF_DIR_KEY,
DEFAULT_CONF_DIR).trim(), workingDir);
+ File libDir = getFile(bootstrapProperties.getProperty(LIB_DIR_KEY,
DEFAULT_LIB_DIR).trim(), workingDir);
+
String minifiLogDir = System.getProperty(LOG_DIR,
DEFAULT_LOG_DIR).trim();
String minifiAppLogFileName = System.getProperty(APP_LOG_FILE_NAME,
DEFAULT_APP_LOG_FILE_NAME).trim();
String minifiAppLogFileExtension =
System.getProperty(APP_LOG_FILE_EXTENSION, DEFAULT_LOG_FILE_EXTENSION).trim();
String minifiBootstrapLogFileName =
System.getProperty(BOOTSTRAP_LOG_FILE_NAME,
DEFAULT_BOOTSTRAP_LOG_FILE_NAME).trim();
String minifiBootstrapLogFileExtension =
System.getProperty(BOOTSTRAP_LOG_FILE_EXTENSION,
DEFAULT_LOG_FILE_EXTENSION).trim();
- List<String> cmd = new ArrayList<>();
- cmd.add(getJavaCommand(props));
- cmd.add("-classpath");
- cmd.add(buildClassPath(props, confDir, libDir));
- cmd.addAll(getJavaAdditionalArgs(props));
- cmd.add("-Dnifi.properties.file.path=" + getMiNiFiPropsFileName(props,
confDir));
- cmd.add("-Dnifi.bootstrap.listen.port=" + listenPort);
- cmd.add("-Dapp=MiNiFi");
- cmd.add("-D" + LOG_DIR + "=" + minifiLogDir);
- cmd.add("-D" + APP_LOG_FILE_NAME + "=" + minifiAppLogFileName);
- cmd.add("-D" + APP_LOG_FILE_EXTENSION + "=" +
minifiAppLogFileExtension);
- cmd.add("-D" + BOOTSTRAP_LOG_FILE_NAME + "=" +
minifiBootstrapLogFileName);
- cmd.add("-D" + BOOTSTRAP_LOG_FILE_EXTENSION + "=" +
minifiBootstrapLogFileExtension);
- cmd.add("org.apache.nifi.minifi.MiNiFi");
-
- return cmd;
+ return List.of(
+ getJavaCommand(bootstrapProperties),
+ "-classpath",
+ buildClassPath(confDir, libDir),
+ getJavaAdditionalArgs(bootstrapProperties),
Review Comment:
Thanks for finding this issue, really appreciate it!
##########
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiProperties.java:
##########
@@ -93,25 +79,47 @@ public enum MiNiFiProperties {
C2_REST_CALL_TIMEOUT("c2.rest.callTimeout", "10 sec", false, true,
TIME_PERIOD_VALIDATOR),
C2_MAX_IDLE_CONNECTIONS("c2.rest.maxIdleConnections", "5", false, true,
NON_NEGATIVE_INTEGER_VALIDATOR),
C2_KEEP_ALIVE_DURATION("c2.rest.keepAliveDuration", "5 min", false, true,
TIME_PERIOD_VALIDATOR),
- C2_AGENT_HEARTBEAT_PERIOD("c2.agent.heartbeat.period", "1000", false,
true, LONG_VALIDATOR),
- C2_AGENT_CLASS("c2.agent.class", "", false, true, VALID),
+ C2_REST_HTTP_HEADERS("c2.rest.http.headers", "", false, true, VALID),
Review Comment:
Set it to `Accept:application/json`
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf:
##########
@@ -150,17 +107,11 @@ java.arg.14=-Djava.awt.headless=true
#c2.rest.connectionTimeout=5 sec
#c2.rest.readTimeout=5 sec
#c2.rest.callTimeout=10 sec
-## heartbeat in milliseconds
-#c2.agent.heartbeat.period=5000
-## define parameters about your agent
-#c2.agent.class=
+# Comma separated list of HTTP headers, eg: Accept:text/json
+#c2.rest.http.headers=text/json
Review Comment:
Thanks, I added the missing `Accept` part and also replaced text/json to
application/json everywhere
##########
c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java:
##########
@@ -321,6 +343,21 @@ public Builder connectTimeout(long connectTimeout) {
return this;
}
+ public Builder httpHeaders(String httpHeaders) {
+ this.httpHeaders = ofNullable(httpHeaders)
+ .filter(StringUtils::isNotBlank)
+ .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+ .map(Arrays::stream)
+ .orElseGet(Stream::of)
Review Comment:
Thanks, replaced
##########
c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationStrategy.java:
##########
@@ -15,17 +15,10 @@
* limitations under the License.
*/
-package org.apache.nifi.minifi.c2.provider.nifi.rest;
+package org.apache.nifi.c2.client.service.operation;
-import java.io.IOException;
+@FunctionalInterface
+public interface UpdateConfigurationStrategy {
Review Comment:
Sure, added
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -121,10 +122,18 @@ private void start() throws IOException,
InterruptedException {
Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
- initConfigFiles(bootstrapProperties, confDir);
- Process process = startMiNiFi();
+ DEFAULT_LOGGER.debug("Generating minifi.properties from
bootstrap.conf");
+ initConfigFile(bootstrapProperties, confDir);
+ Path flowConfigFile =
Path.of(bootstrapProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath();
Review Comment:
Extracted. Updated the docs as well
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java:
##########
@@ -35,64 +39,72 @@
public class ConfigurationChangeCoordinator implements Closeable,
ConfigurationChangeNotifier {
- public static final String NOTIFIER_PROPERTY_PREFIX =
"nifi.minifi.notifier";
- public static final String NOTIFIER_INGESTORS_KEY =
NOTIFIER_PROPERTY_PREFIX + ".ingestors";
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+ public static final String NOTIFIER_INGESTORS_KEY =
"nifi.minifi.notifier.ingestors";
- private final Set<ConfigurationChangeListener>
configurationChangeListeners;
- private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigurationChangeCoordinator.class);
+ private static final String COMMA = ",";
private final BootstrapFileProvider bootstrapFileProvider;
private final RunMiNiFi runMiNiFi;
+ private final Set<ConfigurationChangeListener>
configurationChangeListeners;
+ private final Set<ChangeIngestor> changeIngestors;
public ConfigurationChangeCoordinator(BootstrapFileProvider
bootstrapFileProvider, RunMiNiFi runMiNiFi,
- Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
+ Set<ConfigurationChangeListener>
miNiFiConfigurationChangeListeners) {
this.bootstrapFileProvider = bootstrapFileProvider;
this.runMiNiFi = runMiNiFi;
- this.configurationChangeListeners =
Optional.ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+ this.configurationChangeListeners =
ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
+ this.changeIngestors = new HashSet<>();
+ }
+
+ @Override
+ public Collection<ListenerHandleResult> notifyListeners(ByteBuffer
newFlowConfig) {
+ LOGGER.info("Notifying Listeners of a change");
+ return configurationChangeListeners.stream()
+ .map(listener -> notifyListener(newFlowConfig, listener))
+ .collect(toList());
+ }
+
+ @Override
+ public void close() {
+ closeIngestors();
}
/**
* Begins the associated notification service provided by the given
implementation. In most implementations, no action will occur until this
method is invoked.
*/
- public void start() throws IOException{
+ public void start() throws IOException {
initialize();
changeIngestors.forEach(ChangeIngestor::start);
}
- /**
- * Provides an immutable collection of listeners for the notifier instance
- *
- * @return a collection of those listeners registered for notifications
- */
- public Set<ConfigurationChangeListener> getChangeListeners() {
- return Collections.unmodifiableSet(configurationChangeListeners);
+ private ListenerHandleResult notifyListener(ByteBuffer newFlowConfig,
ConfigurationChangeListener listener) {
+ try {
+ listener.handleChange(new
ByteBufferInputStream(newFlowConfig.duplicate()));
+ ListenerHandleResult listenerHandleResult = new
ListenerHandleResult(listener);
+ LOGGER.info("Listener notification result {}",
listenerHandleResult);
+ return listenerHandleResult;
+ } catch (ConfigurationChangeException ex) {
+ ListenerHandleResult listenerHandleResult = new
ListenerHandleResult(listener, ex);
+ LOGGER.info("Listener notification result {} with failure {}",
listenerHandleResult, ex);
+ return listenerHandleResult;
+ }
}
- /**
- * Provide the mechanism by which listeners are notified
- */
- public Collection<ListenerHandleResult> notifyListeners(ByteBuffer
newConfig) {
- LOGGER.info("Notifying Listeners of a change");
+ private void initialize() throws IOException {
+ closeIngestors();
- Collection<ListenerHandleResult> listenerHandleResults = new
ArrayList<>(configurationChangeListeners.size());
- for (final ConfigurationChangeListener listener :
getChangeListeners()) {
- ListenerHandleResult result;
- try {
- listener.handleChange(new
ByteBufferInputStream(newConfig.duplicate()));
- result = new ListenerHandleResult(listener);
- } catch (ConfigurationChangeException ex) {
- result = new ListenerHandleResult(listener, ex);
- }
- listenerHandleResults.add(result);
- LOGGER.info("Listener notification result: {}", result);
- }
- return listenerHandleResults;
+ Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
+ ofNullable(bootstrapProperties.getProperty(NOTIFIER_INGESTORS_KEY))
+ .filter(not(String::isBlank))
+ .map(ingestors -> ingestors.split(COMMA))
+ .map(Stream::of)
Review Comment:
Fixed
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUpdateConfigurationStrategy implements
UpdateConfigurationStrategy {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
+
+ private final FlowController flowController;
+ private final FlowService flowService;
+ private final FlowEnrichService flowEnrichService;
+ private final Path flowConfigurationFile;
+ private final Path backupFlowConfigurationFile;
+ private final Path rawFlowConfigurationFile;
+ private final Path backupRawFlowConfigurationFile;
+
+ public DefaultUpdateConfigurationStrategy(FlowController flowController,
FlowService flowService, FlowEnrichService flowEnrichService, String
flowConfigurationFile) {
+ this.flowController = flowController;
+ this.flowService = flowService;
+ this.flowEnrichService = flowEnrichService;
+ Path flowConfigurationFilePath =
Path.of(flowConfigurationFile).toAbsolutePath();
+ this.flowConfigurationFile = flowConfigurationFilePath;
+ this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath +
BACKUP_EXTENSION);
+ String flowConfigurationFileBaseName =
FilenameUtils.getBaseName(flowConfigurationFilePath.toString());
+ this.rawFlowConfigurationFile =
flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName +
RAW_EXTENSION);
+ this.backupRawFlowConfigurationFile =
flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName +
BACKUP_EXTENSION + RAW_EXTENSION);
+ }
+
+ @Override
+ public boolean update(byte[] rawFlow) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Attempting to update flow with content: \n{}", new
String(rawFlow, UTF_8));
+ }
+ try {
+ byte[] enrichedFlowCandidate =
flowEnrichService.enrichFlow(rawFlow);
+ stopRootProcessGroup();
+ backup(flowConfigurationFile, backupFlowConfigurationFile);
+ backup(rawFlowConfigurationFile, backupRawFlowConfigurationFile);
+ persist(enrichedFlowCandidate, flowConfigurationFile, true);
+ reloadFlow();
+ startRootProcessGroup();
+ persist(rawFlow, rawFlowConfigurationFile, false);
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("Configuration update failed. Reverting to previous
flow", e);
+ revert(backupFlowConfigurationFile, flowConfigurationFile);
+ revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
+ return false;
+ } finally {
+ removeIfExists(backupFlowConfigurationFile);
+ removeIfExists(backupRawFlowConfigurationFile);
+ }
+ }
+
+ private void stopRootProcessGroup() {
+ ProcessGroup rootProcessGroup =
flowController.getFlowManager().getGroup(flowController.getFlowManager().getRootGroupId());
+ rootProcessGroup.findAllRemoteProcessGroups()
+ .stream()
+ .map(RemoteProcessGroup::stopTransmitting)
+ .forEach(future -> {
+ try {
+ future.get(5000, TimeUnit.MICROSECONDS);
+ } catch (Exception e) {
+ LOGGER.warn("Unable to stop remote process group", e);
+ }
+ });
+ rootProcessGroup.stopProcessing();
+ }
+
+ private void backup(Path current, Path backup) throws IOException {
Review Comment:
I extracted the common logic to the newly created minifi-commons-framework
module
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultUpdateConfigurationStrategy implements
UpdateConfigurationStrategy {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultUpdateConfigurationStrategy.class);
+
+ private final FlowController flowController;
+ private final FlowService flowService;
+ private final FlowEnrichService flowEnrichService;
+ private final Path flowConfigurationFile;
+ private final Path backupFlowConfigurationFile;
+ private final Path rawFlowConfigurationFile;
+ private final Path backupRawFlowConfigurationFile;
+
+ public DefaultUpdateConfigurationStrategy(FlowController flowController,
FlowService flowService, FlowEnrichService flowEnrichService, String
flowConfigurationFile) {
+ this.flowController = flowController;
+ this.flowService = flowService;
+ this.flowEnrichService = flowEnrichService;
+ Path flowConfigurationFilePath =
Path.of(flowConfigurationFile).toAbsolutePath();
+ this.flowConfigurationFile = flowConfigurationFilePath;
+ this.backupFlowConfigurationFile = Path.of(flowConfigurationFilePath +
BACKUP_EXTENSION);
+ String flowConfigurationFileBaseName =
FilenameUtils.getBaseName(flowConfigurationFilePath.toString());
+ this.rawFlowConfigurationFile =
flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName +
RAW_EXTENSION);
+ this.backupRawFlowConfigurationFile =
flowConfigurationFilePath.getParent().resolve(flowConfigurationFileBaseName +
BACKUP_EXTENSION + RAW_EXTENSION);
+ }
+
+ @Override
+ public boolean update(byte[] rawFlow) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Attempting to update flow with content: \n{}", new
String(rawFlow, UTF_8));
+ }
+ try {
+ byte[] enrichedFlowCandidate =
flowEnrichService.enrichFlow(rawFlow);
+ stopRootProcessGroup();
+ backup(flowConfigurationFile, backupFlowConfigurationFile);
+ backup(rawFlowConfigurationFile, backupRawFlowConfigurationFile);
+ persist(enrichedFlowCandidate, flowConfigurationFile, true);
+ reloadFlow();
+ startRootProcessGroup();
+ persist(rawFlow, rawFlowConfigurationFile, false);
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("Configuration update failed. Reverting to previous
flow", e);
+ revert(backupFlowConfigurationFile, flowConfigurationFile);
+ revert(backupRawFlowConfigurationFile, rawFlowConfigurationFile);
+ return false;
+ } finally {
+ removeIfExists(backupFlowConfigurationFile);
+ removeIfExists(backupRawFlowConfigurationFile);
+ }
+ }
+
+ private void stopRootProcessGroup() {
+ ProcessGroup rootProcessGroup =
flowController.getFlowManager().getGroup(flowController.getFlowManager().getRootGroupId());
+ rootProcessGroup.findAllRemoteProcessGroups()
+ .stream()
Review Comment:
Done
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+ public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+ private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+ private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT =
"original_raw_content".getBytes(UTF_8);
Review Comment:
Yes, marked them as final
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java:
##########
@@ -90,261 +89,233 @@ public class PullHttpChangeIngestor extends
AbstractPullChangeIngestor {
public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY +
".differentiator";
public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY +
".override.security";
+ public static final String HTTP_HEADERS = PULL_HTTP_BASE_KEY + ".headers";
+
+ private static final Logger logger =
LoggerFactory.getLogger(PullHttpChangeIngestor.class);
+
+ private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+ WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator
+ );
+ private static final int NOT_MODIFIED_STATUS_CODE = 304;
+ private static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
+ private static final String DEFAULT_READ_TIMEOUT_MS = "15000";
+ private static final String DOUBLE_QUOTES = "\"";
+ private static final String ETAG_HEADER = "ETag";
+ private static final String PROXY_AUTHORIZATION_HEADER =
"Proxy-Authorization";
+ private static final String DEFAULT_PATH = "/";
+ private static final int BAD_REQUEST_STATUS_CODE = 400;
+ private static final String IF_NONE_MATCH_HEADER_KEY = "If-None-Match";
+ private static final String HTTP_HEADERS_SEPARATOR = ",";
+ private static final String HTTP_HEADER_KEY_VALUE_SEPARATOR = ":";
private final AtomicReference<OkHttpClient> httpClientReference = new
AtomicReference<>();
private final AtomicReference<Integer> portReference = new
AtomicReference<>();
private final AtomicReference<String> hostReference = new
AtomicReference<>();
private final AtomicReference<String> pathReference = new
AtomicReference<>();
private final AtomicReference<String> queryReference = new
AtomicReference<>();
+ private final AtomicReference<Map<String, String>> httpHeadersReference =
new AtomicReference<>();
+
private volatile Differentiator<ByteBuffer> differentiator;
private volatile String connectionScheme;
private volatile String lastEtag = "";
private volatile boolean useEtag = false;
- public PullHttpChangeIngestor() {
- logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
- }
-
@Override
public void initialize(Properties properties, ConfigurationFileHolder
configurationFileHolder, ConfigurationChangeNotifier
configurationChangeNotifier) {
super.initialize(properties, configurationFileHolder,
configurationChangeNotifier);
-
pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY,
DEFAULT_POLLING_PERIOD)));
+
pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY,
DEFAULT_POLLING_PERIOD_MILLISECONDS)));
if (pollingPeriodMS.get() < 1) {
- throw new IllegalArgumentException("Property, " +
PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a
positive integer.");
- }
-
- final String host = properties.getProperty(HOST_KEY);
- if (host == null || host.isEmpty()) {
- throw new IllegalArgumentException("Property, " + HOST_KEY + ",
for the hostname to pull configurations from must be specified.");
- }
-
- final String path = properties.getProperty(PATH_KEY, "/");
- final String query = properties.getProperty(QUERY_KEY, "");
-
- final String portString = (String) properties.get(PORT_KEY);
- final Integer port;
- if (portString == null) {
- throw new IllegalArgumentException("Property, " + PORT_KEY + ",
for the hostname to pull configurations from must be specified.");
- } else {
- port = Integer.parseInt(portString);
+ throw new IllegalArgumentException("Property, " +
PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a
positive integer");
}
- portReference.set(port);
+ String host = ofNullable(properties.getProperty(HOST_KEY))
+ .filter(StringUtils::isNotBlank)
+ .orElseThrow(() -> new IllegalArgumentException("Property, " +
HOST_KEY + ", for the hostname to pull configurations from must be specified"));
+ String path = properties.getProperty(PATH_KEY, DEFAULT_PATH);
+ String query = properties.getProperty(QUERY_KEY, EMPTY);
+ Map<String, String> httpHeaders =
ofNullable(properties.getProperty(HTTP_HEADERS))
+ .filter(StringUtils::isNotBlank)
+ .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+ .map(Arrays::stream)
Review Comment:
Fixed
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java:
##########
@@ -56,165 +58,154 @@
*/
public class FileChangeIngestor implements Runnable, ChangeIngestor {
- private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP;
-
- static {
- HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new
HashMap<>();
- tempMap.put(WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator);
+ private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+ WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator
+ );
- DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
- }
+ static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY +
".file";
+ static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY +
".config.path";
+ static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY +
".polling.period.seconds";
+ static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
+ private final static Logger logger =
LoggerFactory.getLogger(FileChangeIngestor.class);
- protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
- protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT =
TimeUnit.SECONDS;
+ private static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = SECONDS;
+ private static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY +
".differentiator";
- private final static Logger logger =
LoggerFactory.getLogger(FileChangeIngestor.class);
- private static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY
+ ".file";
+ private volatile Differentiator<ByteBuffer> differentiator;
+ private volatile ConfigurationChangeNotifier configurationChangeNotifier;
- protected static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY
+ ".config.path";
- protected static final String POLLING_PERIOD_INTERVAL_KEY =
CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
- public static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY +
".differentiator";
+ private ScheduledExecutorService executorService;
private Path configFilePath;
private WatchService watchService;
private long pollingSeconds;
- private volatile Differentiator<ByteBuffer> differentiator;
- private volatile ConfigurationChangeNotifier configurationChangeNotifier;
- private volatile ConfigurationFileHolder configurationFileHolder;
- private volatile Properties properties;
- private ScheduledExecutorService executorService;
- protected static WatchService initializeWatcher(Path filePath) {
+ @Override
+ public void initialize(Properties properties, ConfigurationFileHolder
configurationFileHolder, ConfigurationChangeNotifier
configurationChangeNotifier) {
+ Path configFile =
ofNullable(properties.getProperty(CONFIG_FILE_PATH_KEY))
+ .filter(not(String::isBlank))
+ .map(Path::of)
+ .map(Path::toAbsolutePath)
+ .orElseThrow(() -> new IllegalArgumentException("Property, " +
CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified"));
try {
- final WatchService fsWatcher =
FileSystems.getDefault().newWatchService();
- final Path watchDirectory = filePath.getParent();
- watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+ this.configurationChangeNotifier = configurationChangeNotifier;
+ this.configFilePath = configFile;
+ this.pollingSeconds =
ofNullable(properties.getProperty(POLLING_PERIOD_INTERVAL_KEY,
Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL)))
+ .map(Long::parseLong)
+ .filter(duration -> duration > 0)
+ .map(duration -> SECONDS.convert(duration,
DEFAULT_POLLING_PERIOD_UNIT))
+ .orElseThrow(() -> new IllegalArgumentException("Cannot
specify a polling period with duration <=0"));
+ this.watchService = initializeWatcher(configFile);
+ this.differentiator =
ofNullable(properties.getProperty(DIFFERENTIATOR_KEY))
+ .filter(not(String::isBlank))
+ .map(differentiator ->
ofNullable(DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiator))
+ .map(Supplier::get)
+
.orElseThrow(unableToFindDifferentiatorExceptionSupplier(differentiator)))
+
.orElseGet(WholeConfigDifferentiator::getByteBufferDifferentiator);
+ this.differentiator.initialize(configurationFileHolder);
+ } catch (Exception e) {
+ throw new IllegalStateException("Could not successfully initialize
file change notifier", e);
+ }
- return fsWatcher;
- } catch (IOException ioe) {
- throw new IllegalStateException("Unable to initialize a file
system watcher for the path " + filePath, ioe);
+ if
(Path.of(properties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath().equals(configFile))
{
Review Comment:
Good catch, updated the criteria and extracted the check to a separate method
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java:
##########
@@ -14,115 +14,146 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.nifi.minifi.bootstrap.service;
+import static java.nio.ByteBuffer.wrap;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.copy;
+import static java.nio.file.Files.deleteIfExists;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.Files.newOutputStream;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
-import static
org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
-import static
org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
-
-import java.io.File;
-import java.io.FileInputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.commons.io.IOUtils.toByteArray;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.io.IOUtils;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
import
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
import
org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import org.eclipse.jetty.io.RuntimeIOException;
import org.slf4j.Logger;
public class MiNiFiConfigurationChangeListener implements
ConfigurationChangeListener {
+ private static final ReentrantLock handlingLock = new ReentrantLock();
+
private final RunMiNiFi runner;
private final Logger logger;
private final BootstrapFileProvider bootstrapFileProvider;
+ private final FlowEnrichService flowEnrichService;
- private static final ReentrantLock handlingLock = new ReentrantLock();
-
- public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger,
BootstrapFileProvider bootstrapFileProvider) {
+ public MiNiFiConfigurationChangeListener(RunMiNiFi runner, Logger logger,
BootstrapFileProvider bootstrapFileProvider, FlowEnrichService
flowEnrichService) {
this.runner = runner;
this.logger = logger;
this.bootstrapFileProvider = bootstrapFileProvider;
+ this.flowEnrichService = flowEnrichService;
}
@Override
- public void handleChange(InputStream configInputStream) throws
ConfigurationChangeException {
+ public String getDescriptor() {
+ return "MiNiFiConfigurationChangeListener";
+ }
+
+ @Override
+ public void handleChange(InputStream flowConfigInputStream) throws
ConfigurationChangeException {
logger.info("Received notification of a change");
if (!handlingLock.tryLock()) {
throw new ConfigurationChangeException("Instance is already
handling another change");
}
- // Store the incoming stream as a byte array to be shared among
components that need it
+
+ Path currentFlowConfigFile = null;
+ Path backupFlowConfigFile = null;
+ Path currentRawFlowConfigFile = null;
+ Path backupRawFlowConfigFile = null;
try {
Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
- File configFile = new
File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
- File swapConfigFile = bootstrapFileProvider.getConfigYmlSwapFile();
- logger.info("Persisting old configuration to {}",
swapConfigFile.getAbsolutePath());
+ currentFlowConfigFile =
Path.of(bootstrapProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath();
+ backupFlowConfigFile = Path.of(currentFlowConfigFile +
BACKUP_EXTENSION);
+ String currentFlowConfigFileBaseName =
FilenameUtils.getBaseName(currentFlowConfigFile.toString());
+ currentRawFlowConfigFile =
currentFlowConfigFile.getParent().resolve(currentFlowConfigFileBaseName +
RAW_EXTENSION);
+ backupRawFlowConfigFile =
currentFlowConfigFile.getParent().resolve(currentFlowConfigFileBaseName +
RAW_EXTENSION + BACKUP_EXTENSION);
- try (FileInputStream configFileInputStream = new
FileInputStream(configFile)) {
- Files.copy(configFileInputStream, swapConfigFile.toPath(),
REPLACE_EXISTING);
- }
+ backup(currentFlowConfigFile, backupFlowConfigFile);
+ backup(currentRawFlowConfigFile, backupRawFlowConfigFile);
- // write out new config to file
- Files.copy(configInputStream, configFile.toPath(),
REPLACE_EXISTING);
-
- // Create an input stream to feed to the config transformer
- try (FileInputStream newConfigIs = new
FileInputStream(configFile)) {
- try {
- String confDir =
bootstrapProperties.getProperty(CONF_DIR_KEY);
- transformConfigurationFiles(confDir, newConfigIs,
configFile, swapConfigFile);
- } catch (Exception e) {
- logger.debug("Transformation of new config file failed
after swap file was created, deleting it.");
- if (!swapConfigFile.delete()) {
- logger.warn("The swap file failed to delete after a
failed handling of a change. It should be cleaned up manually.");
- }
- throw e;
- }
- }
+ byte[] rawFlow = toByteArray(flowConfigInputStream);
+ byte[] enrichedFlow = flowEnrichService.enrichFlow(rawFlow);
+ persist(enrichedFlow, currentFlowConfigFile, true);
+ restartInstance();
+ setActiveFlowReference(wrap(rawFlow));
+ persist(rawFlow, currentRawFlowConfigFile, false);
Review Comment:
Good point, moved setActiveFlowReference so it's the last step now
##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/TransformYamlCommandFactory.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.System.lineSeparator;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.io.IOUtils.write;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigMain;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigTransformException;
+import org.apache.nifi.minifi.toolkit.configuration.PathInputStreamFactory;
+import org.apache.nifi.minifi.toolkit.configuration.PathOutputStreamFactory;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.Schema;
+import
org.apache.nifi.minifi.toolkit.schema.exception.SchemaInstantiatonException;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.toolkit.schema.serialization.SchemaLoader;
+
+public class TransformYamlCommandFactory {
Review Comment:
Sure will add
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.newOutputStream;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiConstants.BACKUP_EXTENSION;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.RAW_EXTENSION;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationStrategy;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.minifi.commons.api.FlowEnrichService;
+import org.apache.nifi.services.FlowService;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class DefaultUpdateConfigurationStrategyTest {
+
+ public static final String ROOT_PROCESS_GROUP_ID = "root_process_group";
+ private static String FLOW_CONFIG_FILE_NAME = "flow.config.gz";
+
+ private static byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT =
"original_raw_content".getBytes(UTF_8);
+ private static byte[] ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT =
"original_enriched_content".getBytes(UTF_8);
+ private static byte[] NEW_RAW_FLOW_CONFIG_CONTENT =
"new_raw_content".getBytes(UTF_8);
+ private static byte[] NEW_ENRICHED_FLOW_CONFIG_CONTENT =
"new_enriched_content".getBytes(UTF_8);
+
+ @TempDir
+ private File tempDir;
+
+ @Mock
+ private FlowController mockFlowController;
+ @Mock
+ private FlowService mockFlowService;
+ @Mock
+ private FlowEnrichService mockFlowEnrichService;
+ @Mock
+ private FlowManager mockFlowManager;
+ @Mock
+ private ProcessGroup mockProcessGroup;
+
+ private Path flowConfigurationFile;
+ private Path backupFlowConfigurationFile;
+ private Path rawFlowConfigurationFile;
+ private Path backupRawFlowConfigurationFile;
+
+ private UpdateConfigurationStrategy testUpdateConfiguratinStrategy;
+
+ @BeforeEach
+ public void setup() {
+ flowConfigurationFile = Path.of(tempDir.getAbsolutePath(),
FLOW_CONFIG_FILE_NAME).toAbsolutePath();
+ backupFlowConfigurationFile = Path.of(flowConfigurationFile +
BACKUP_EXTENSION);
+ String flowConfigurationFileBaseName =
FilenameUtils.getBaseName(flowConfigurationFile.toString());
+ rawFlowConfigurationFile =
flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName +
RAW_EXTENSION);
+ backupRawFlowConfigurationFile =
flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName +
BACKUP_EXTENSION + RAW_EXTENSION);
+
+ testUpdateConfiguratinStrategy = new
DefaultUpdateConfigurationStrategy(mockFlowController, mockFlowService,
mockFlowEnrichService, flowConfigurationFile.toString());
+
+ writeGzipFile(flowConfigurationFile,
ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT);
+ writePlainTextFile(rawFlowConfigurationFile,
ORIGINAL_RAW_FLOW_CONFIG_CONTENT);
+ }
+
+ @Test
+ public void testFlowIsUpdatedAndBackupsAreClearedUp() throws IOException {
+ // given
+
when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
+ when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
+
when(mockFlowManager.getRootGroupId()).thenReturn(ROOT_PROCESS_GROUP_ID);
+
when(mockFlowManager.getGroup(ROOT_PROCESS_GROUP_ID)).thenReturn(mockProcessGroup);
+
+ // when
+ boolean result =
testUpdateConfiguratinStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
+
+ //then
+ assertTrue(result);
+ assertTrue(exists(flowConfigurationFile));
+ assertTrue(exists(rawFlowConfigurationFile));
+ assertArrayEquals(NEW_ENRICHED_FLOW_CONFIG_CONTENT,
readGzipFile(flowConfigurationFile));
+ assertArrayEquals(NEW_RAW_FLOW_CONFIG_CONTENT,
readPlainTextFile(rawFlowConfigurationFile));
+ assertFalse(exists(backupFlowConfigurationFile));
+ assertFalse(exists(backupRawFlowConfigurationFile));
+ verify(mockProcessGroup, times(1)).stopProcessing();
+ verify(mockFlowService, times(1)).load(null);
+ verify(mockFlowController, times(1)).onFlowInitialized(true);
+ verify(mockProcessGroup, times(1)).startProcessing();
+ }
+
+ @Test
+ public void testFlowIsRevertedInCaseOfAnyErrorAndBackupsAreClearedUp()
throws IOException {
+ // given
+
when(mockFlowEnrichService.enrichFlow(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
+ when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
+
when(mockFlowManager.getRootGroupId()).thenReturn(ROOT_PROCESS_GROUP_ID);
+
when(mockFlowManager.getGroup(ROOT_PROCESS_GROUP_ID)).thenReturn(mockProcessGroup);
+ doThrow(new IOException()).when(mockFlowService).load(null);
+
+ // when
+ boolean result =
testUpdateConfiguratinStrategy.update(NEW_RAW_FLOW_CONFIG_CONTENT);
+
+ //then
+ assertFalse(result);
+ assertTrue(exists(flowConfigurationFile));
+ assertTrue(exists(rawFlowConfigurationFile));
+ assertArrayEquals(ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT,
readGzipFile(flowConfigurationFile));
+ assertArrayEquals(ORIGINAL_RAW_FLOW_CONFIG_CONTENT,
readPlainTextFile(rawFlowConfigurationFile));
+ assertFalse(exists(backupFlowConfigurationFile));
+ assertFalse(exists(backupRawFlowConfigurationFile));
+ verify(mockProcessGroup, times(1)).stopProcessing();
+ verify(mockFlowService, times(1)).load(null);
+ verify(mockFlowController, times(0)).onFlowInitialized(true);
+ verify(mockProcessGroup, times(0)).startProcessing();
+ }
+
+ private void writeGzipFile(Path path, byte[] content) {
+ try (ByteArrayInputStream inputStream = new
ByteArrayInputStream(content);
+ OutputStream outputStream = new
GZIPOutputStream(newOutputStream(path, WRITE, CREATE, TRUNCATE_EXISTING))) {
+ inputStream.transferTo(outputStream);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private byte[] readGzipFile(Path path) {
+ try (InputStream inputStream = new
GZIPInputStream(Files.newInputStream(path));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream())
{
+ inputStream.transferTo(outputStream);
+ outputStream.flush();
+ return outputStream.toByteArray();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void writePlainTextFile(Path path, byte[] content) {
Review Comment:
Sure, fixed
##########
nifi-commons/nifi-kubernetes-client/src/test/java/org/apache/nifi/kubernetes/client/StandardKubernetesClientProviderTest.java:
##########
@@ -31,7 +31,7 @@ void setProvider() {
provider = new StandardKubernetesClientProvider();
}
- @Timeout(5)
+ @Timeout(10)
Review Comment:
Yes. Will revert this, just needed it until the development is done.
##########
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/flow.json.raw:
##########
@@ -0,0 +1,38 @@
+{
+ "encodingVersion": {
+ "majorVersion": 2,
+ "minorVersion": 0
+ },
+ "maxTimerDrivenThreadCount": 1,
+ "maxEventDrivenThreadCount": 1,
+ "registries": [],
+ "parameterContexts": [],
+ "parameterProviders": [],
+ "controllerServices": [],
+ "reportingTasks": [],
+ "templates": [],
+ "rootGroup": {
+ "name": "NiFi Flow",
Review Comment:
Renamed
##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ComponentPropertyProvider.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.util.Objects.nonNull;
+import static java.util.Optional.ofNullable;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Stream.concat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithId;
+
+public class ComponentPropertyProvider {
Review Comment:
Added
##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/TransformYamlCommandFactory.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.System.lineSeparator;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.commons.io.IOUtils.write;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigMain;
+import org.apache.nifi.minifi.toolkit.configuration.ConfigTransformException;
+import org.apache.nifi.minifi.toolkit.configuration.PathInputStreamFactory;
+import org.apache.nifi.minifi.toolkit.configuration.PathOutputStreamFactory;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.ConvertableSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.Schema;
+import
org.apache.nifi.minifi.toolkit.schema.exception.SchemaInstantiatonException;
+import org.apache.nifi.minifi.toolkit.schema.exception.SchemaLoaderException;
+import org.apache.nifi.minifi.toolkit.schema.serialization.SchemaLoader;
+
+public class TransformYamlCommandFactory {
+
+ public static final String TRANSFORM_YML = "transform-yml";
+
+ private static final String COMMAND_DESCRIPTION = "Transform MiNiFi config
YAML into NiFi flow JSON format";
+ private static final String PROPERTY_KEY_VALUE_DELIMITER = "=";
+
+ private final PathInputStreamFactory pathInputStreamFactory;
+ private final PathOutputStreamFactory pathOutputStreamFactory;
+
+ public TransformYamlCommandFactory(PathInputStreamFactory
pathInputStreamFactory, PathOutputStreamFactory pathOutputStreamFactory) {
+ this.pathInputStreamFactory = pathInputStreamFactory;
+ this.pathOutputStreamFactory = pathOutputStreamFactory;
+ }
+
+ public ConfigMain.Command create() {
+ return new ConfigMain.Command(this::transformYamlToJson,
COMMAND_DESCRIPTION);
+ }
+
+ private int transformYamlToJson(String[] args) {
+ if (args.length != 5) {
+ printTransformYmlUsage();
+ return ConfigMain.ERR_INVALID_ARGS;
+ }
+
+ String sourceMiNiFiConfigPath = args[1];
Review Comment:
Updated
##########
minifi/minifi-toolkit/minifi-toolkit-configuration/src/main/java/org/apache/nifi/minifi/toolkit/configuration/json/ConfigSchemaToVersionedDataFlowTransformer.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.toolkit.configuration.json;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Map.entry;
+import static java.util.Optional.ofNullable;
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static
org.apache.nifi.minifi.commons.api.MiNiFiProperties.NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS;
+import static
org.apache.nifi.util.NiFiProperties.ADMINISTRATIVE_YIELD_DURATION;
+import static org.apache.nifi.util.NiFiProperties.BORED_YIELD_DURATION;
+import static
org.apache.nifi.util.NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY;
+import static org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_ENABLED;
+import static
org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD;
+import static
org.apache.nifi.util.NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE;
+import static
org.apache.nifi.util.NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION;
+import static
org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC;
+import static
org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL;
+import static
org.apache.nifi.util.NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION;
+import static
org.apache.nifi.util.NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD;
+import static org.apache.nifi.util.NiFiProperties.MAX_APPENDABLE_CLAIM_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_INDEX_SHARD_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_SIZE;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_MAX_STORAGE_TIME;
+import static
org.apache.nifi.util.NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS;
+import static org.apache.nifi.util.NiFiProperties.PROVENANCE_ROLLOVER_TIME;
+import static org.apache.nifi.util.NiFiProperties.QUEUE_SWAP_THRESHOLD;
+import static org.apache.nifi.util.NiFiProperties.VARIABLE_REGISTRY_PROPERTIES;
+import static org.apache.nifi.util.NiFiProperties.WRITE_DELAY_INTERVAL;
+
+import com.google.common.base.Splitter;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.minifi.toolkit.schema.ComponentStatusRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ConfigSchema;
+import org.apache.nifi.minifi.toolkit.schema.ConnectionSchema;
+import org.apache.nifi.minifi.toolkit.schema.ContentRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.ControllerServiceSchema;
+import org.apache.nifi.minifi.toolkit.schema.CorePropertiesSchema;
+import org.apache.nifi.minifi.toolkit.schema.FlowFileRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.FunnelSchema;
+import org.apache.nifi.minifi.toolkit.schema.PortSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProcessorSchema;
+import org.apache.nifi.minifi.toolkit.schema.ProvenanceRepositorySchema;
+import org.apache.nifi.minifi.toolkit.schema.RemotePortSchema;
+import org.apache.nifi.minifi.toolkit.schema.RemoteProcessGroupSchema;
+import org.apache.nifi.minifi.toolkit.schema.ReportingSchema;
+import org.apache.nifi.minifi.toolkit.schema.SwapSchema;
+import org.apache.nifi.minifi.toolkit.schema.common.BaseSchemaWithIdAndName;
+import org.apache.nifi.scheduling.ExecutionNode;
+
+public class ConfigSchemaToVersionedDataFlowTransformer {
+
+ private static final String RPG_URLS_DELIMITER = ",";
+ private static final String DEFAULT_FLOW_FILE_EXPIRATION = "0 sec";
+ private static final String DEFAULT_BACK_PRESSURE_DATA_SIZE_THRESHOLD = "1
GB";
+ private static final String FLOW_FILE_CONCURRENCY = "UNBOUNDED";
+ private static final String FLOW_FILE_OUTBOUND_POLICY =
"STREAM_WHEN_AVAILABLE";
+ private static final long DEFAULT_BACK_PRESSURE_OBJECT_THRESHOLD = 10000L;
+ private static final Position DEFAULT_POSITION = new Position(0, 0);
+
+ private final ConfigSchema configSchema;
+ private final ComponentPropertyProvider componentPropertyProvider;
+
+ public ConfigSchemaToVersionedDataFlowTransformer(ConfigSchema
configSchema) {
+ this.configSchema = configSchema;
+ this.componentPropertyProvider = new
ComponentPropertyProvider(configSchema);
+ }
+
+ public Map<String, String> extractProperties() {
+ CorePropertiesSchema coreProperties = configSchema.getCoreProperties();
+ FlowFileRepositorySchema flowFileRepositoryProperties =
configSchema.getFlowfileRepositoryProperties();
+ ContentRepositorySchema contentRepositoryProperties =
configSchema.getContentRepositoryProperties();
+ ProvenanceRepositorySchema provenanceRepositoryProperties =
configSchema.getProvenanceRepositorySchema();
+ ComponentStatusRepositorySchema componentStatusRepositoryProperties =
configSchema.getComponentStatusRepositoryProperties();
+ SwapSchema swapProperties =
configSchema.getFlowfileRepositoryProperties().getSwapProperties();
+
+ return Stream.concat(
+ Stream.of(
+ entry(NIFI_MINIFI_FLOW_MAX_CONCURRENT_THREADS.getKey(),
coreProperties.getMaxConcurrentThreads().toString()),
+ entry(FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD,
coreProperties.getFlowControllerGracefulShutdownPeriod()),
+ entry(WRITE_DELAY_INTERVAL,
coreProperties.getFlowServiceWriteDelayInterval()),
+ entry(ADMINISTRATIVE_YIELD_DURATION,
coreProperties.getAdministrativeYieldDuration()),
+ entry(BORED_YIELD_DURATION,
coreProperties.getBoredYieldDuration()),
+ entry(VARIABLE_REGISTRY_PROPERTIES,
coreProperties.getVariableRegistryProperties()),
+ entry(FLOWFILE_REPOSITORY_IMPLEMENTATION,
flowFileRepositoryProperties.getFlowFileRepository()),
+ entry(FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL,
flowFileRepositoryProperties.getCheckpointInterval()),
+ entry(FLOWFILE_REPOSITORY_ALWAYS_SYNC,
Boolean.toString(flowFileRepositoryProperties.getAlwaysSync())),
+ entry(CONTENT_REPOSITORY_IMPLEMENTATION,
contentRepositoryProperties.getContentRepository()),
+ entry(MAX_APPENDABLE_CLAIM_SIZE,
contentRepositoryProperties.getContentClaimMaxAppendableSize()),
+ entry(CONTENT_ARCHIVE_MAX_RETENTION_PERIOD,
contentRepositoryProperties.getContentRepoArchiveMaxRetentionPeriod()),
+ entry(CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE,
contentRepositoryProperties.getContentRepoArchiveMaxUsagePercentage()),
+ entry(CONTENT_ARCHIVE_ENABLED,
Boolean.toString(contentRepositoryProperties.getContentRepoArchiveEnabled())),
+ entry(PROVENANCE_REPO_IMPLEMENTATION_CLASS,
provenanceRepositoryProperties.getProvenanceRepository()),
+ entry(PROVENANCE_ROLLOVER_TIME,
provenanceRepositoryProperties.getProvenanceRepoRolloverTimeKey()),
+ entry(PROVENANCE_INDEX_SHARD_SIZE,
provenanceRepositoryProperties.getProvenanceRepoIndexShardSize()),
+ entry(PROVENANCE_MAX_STORAGE_SIZE,
provenanceRepositoryProperties.getProvenanceRepoMaxStorageSize()),
+ entry(PROVENANCE_MAX_STORAGE_TIME,
provenanceRepositoryProperties.getProvenanceRepoMaxStorageTime()),
+ entry(COMPONENT_STATUS_SNAPSHOT_FREQUENCY,
componentStatusRepositoryProperties.getSnapshotFrequency()),
+ entry(QUEUE_SWAP_THRESHOLD,
swapProperties.getThreshold().toString())
+ ),
+
ofNullable(configSchema.getNifiPropertiesOverrides().entrySet()).orElse(Set.of()).stream()
Review Comment:
Good catch, the intention here was to defend against NifiPropertiesOverrides
being null. Updated the according to that
##########
c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java:
##########
@@ -186,6 +202,9 @@ public long getKeepAliveDuration() {
*/
public static class Builder {
+ private static final String HTTP_HEADERS_SEPARATOR = ",";
Review Comment:
I changed HTTP_HEADERS_SEPARATOR to `#`, did a quick check and it seems no
header is using `#`
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -74,12 +71,14 @@ public class StartRunner implements CommandRunner {
private volatile ShutdownHook shutdownHook;
private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
private final ConfigurationChangeListener configurationChangeListener;
+ private final MiNiFiPropertiesGenerator miNiFiPropertiesGenerator;
private int listenPort;
public StartRunner(CurrentPortProvider currentPortProvider,
BootstrapFileProvider bootstrapFileProvider,
- PeriodicStatusReporterManager periodicStatusReporterManager,
MiNiFiStdLogHandler miNiFiStdLogHandler, MiNiFiParameters miNiFiParameters,
File bootstrapConfigFile,
- RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider
miNiFiExecCommandProvider, ConfigurationChangeListener
configurationChangeListener) {
+ PeriodicStatusReporterManager
periodicStatusReporterManager, MiNiFiStdLogHandler miNiFiStdLogHandler,
MiNiFiParameters miNiFiParameters,
+ File bootstrapConfigFile,
Review Comment:
Agree, fixed
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java:
##########
@@ -29,59 +27,31 @@
public abstract class WholeConfigDifferentiator {
-
- private final static Logger logger =
LoggerFactory.getLogger(WholeConfigDifferentiator.class);
-
public static final String WHOLE_CONFIG_KEY = "Whole Config";
- volatile ConfigurationFileHolder configurationFileHolder;
-
- boolean compareInputStreamToConfigFile(InputStream inputStream) throws
IOException {
- logger.debug("Checking if change is different");
- AtomicReference<ByteBuffer> currentConfigFileReference =
configurationFileHolder.getConfigFileReference();
- ByteBuffer currentConfigFile = currentConfigFileReference.get();
- ByteBuffer byteBuffer = ByteBuffer.allocate(currentConfigFile.limit());
- DataInputStream dataInputStream = new DataInputStream(inputStream);
- try {
- dataInputStream.readFully(byteBuffer.array());
- } catch (EOFException e) {
- logger.debug("New config is shorter than the current. Must be
different.");
- return true;
- }
- logger.debug("Read the input");
+ private final static Logger logger =
LoggerFactory.getLogger(WholeConfigDifferentiator.class);
- if (dataInputStream.available() != 0) {
- return true;
- } else {
- return byteBuffer.compareTo(currentConfigFile) != 0;
- }
- }
+ protected volatile ConfigurationFileHolder configurationFileHolder;
public void initialize(ConfigurationFileHolder configurationFileHolder) {
this.configurationFileHolder = configurationFileHolder;
}
-
- public static class InputStreamInput extends WholeConfigDifferentiator
implements Differentiator<InputStream> {
- public boolean isNew(InputStream inputStream) throws IOException {
- return compareInputStreamToConfigFile(inputStream);
- }
- }
-
- public static class ByteBufferInput extends WholeConfigDifferentiator
implements Differentiator<ByteBuffer> {
- public boolean isNew(ByteBuffer inputBuffer) {
- AtomicReference<ByteBuffer> currentConfigFileReference =
configurationFileHolder.getConfigFileReference();
- ByteBuffer currentConfigFile = currentConfigFileReference.get();
- return inputBuffer.compareTo(currentConfigFile) != 0;
+ public static class ByteBufferInputDifferentiator extends
WholeConfigDifferentiator implements Differentiator<ByteBuffer> {
+ public boolean isNew(ByteBuffer newFlowConfig) {
+ AtomicReference<ByteBuffer> currentFlowConfigReference =
configurationFileHolder.getConfigFileReference();
+ ByteBuffer currentFlowConfig = currentFlowConfigReference.get();
+ logger.debug("Comparing byte buffers:\n newFlow={}\n
existingFlow={}", newFlowConfig, currentFlowConfig);
Review Comment:
You are right, I was using it for debugging, not much added value anymore,
will remove it
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java:
##########
@@ -56,165 +58,154 @@
*/
public class FileChangeIngestor implements Runnable, ChangeIngestor {
- private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP;
-
- static {
- HashMap<String, Supplier<Differentiator<ByteBuffer>>> tempMap = new
HashMap<>();
- tempMap.put(WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator);
+ private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+ WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator
+ );
- DIFFERENTIATOR_CONSTRUCTOR_MAP = Collections.unmodifiableMap(tempMap);
- }
+ static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY +
".file";
+ static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY +
".config.path";
+ static final String POLLING_PERIOD_INTERVAL_KEY = CONFIG_FILE_BASE_KEY +
".polling.period.seconds";
+ static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
+ private final static Logger logger =
LoggerFactory.getLogger(FileChangeIngestor.class);
- protected static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
- protected static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT =
TimeUnit.SECONDS;
+ private static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = SECONDS;
+ private static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY +
".differentiator";
- private final static Logger logger =
LoggerFactory.getLogger(FileChangeIngestor.class);
- private static final String CONFIG_FILE_BASE_KEY = NOTIFIER_INGESTORS_KEY
+ ".file";
+ private volatile Differentiator<ByteBuffer> differentiator;
+ private volatile ConfigurationChangeNotifier configurationChangeNotifier;
- protected static final String CONFIG_FILE_PATH_KEY = CONFIG_FILE_BASE_KEY
+ ".config.path";
- protected static final String POLLING_PERIOD_INTERVAL_KEY =
CONFIG_FILE_BASE_KEY + ".polling.period.seconds";
- public static final String DIFFERENTIATOR_KEY = CONFIG_FILE_BASE_KEY +
".differentiator";
+ private ScheduledExecutorService executorService;
private Path configFilePath;
private WatchService watchService;
private long pollingSeconds;
- private volatile Differentiator<ByteBuffer> differentiator;
- private volatile ConfigurationChangeNotifier configurationChangeNotifier;
- private volatile ConfigurationFileHolder configurationFileHolder;
- private volatile Properties properties;
- private ScheduledExecutorService executorService;
- protected static WatchService initializeWatcher(Path filePath) {
+ @Override
+ public void initialize(Properties properties, ConfigurationFileHolder
configurationFileHolder, ConfigurationChangeNotifier
configurationChangeNotifier) {
+ Path configFile =
ofNullable(properties.getProperty(CONFIG_FILE_PATH_KEY))
+ .filter(not(String::isBlank))
+ .map(Path::of)
+ .map(Path::toAbsolutePath)
+ .orElseThrow(() -> new IllegalArgumentException("Property, " +
CONFIG_FILE_PATH_KEY + ", for the path of the config file must be specified"));
try {
- final WatchService fsWatcher =
FileSystems.getDefault().newWatchService();
- final Path watchDirectory = filePath.getParent();
- watchDirectory.register(fsWatcher, ENTRY_MODIFY);
+ this.configurationChangeNotifier = configurationChangeNotifier;
+ this.configFilePath = configFile;
+ this.pollingSeconds =
ofNullable(properties.getProperty(POLLING_PERIOD_INTERVAL_KEY,
Long.toString(DEFAULT_POLLING_PERIOD_INTERVAL)))
+ .map(Long::parseLong)
+ .filter(duration -> duration > 0)
+ .map(duration -> SECONDS.convert(duration,
DEFAULT_POLLING_PERIOD_UNIT))
+ .orElseThrow(() -> new IllegalArgumentException("Cannot
specify a polling period with duration <=0"));
+ this.watchService = initializeWatcher(configFile);
+ this.differentiator =
ofNullable(properties.getProperty(DIFFERENTIATOR_KEY))
+ .filter(not(String::isBlank))
+ .map(differentiator ->
ofNullable(DIFFERENTIATOR_CONSTRUCTOR_MAP.get(differentiator))
+ .map(Supplier::get)
+
.orElseThrow(unableToFindDifferentiatorExceptionSupplier(differentiator)))
+
.orElseGet(WholeConfigDifferentiator::getByteBufferDifferentiator);
+ this.differentiator.initialize(configurationFileHolder);
+ } catch (Exception e) {
+ throw new IllegalStateException("Could not successfully initialize
file change notifier", e);
+ }
- return fsWatcher;
- } catch (IOException ioe) {
- throw new IllegalStateException("Unable to initialize a file
system watcher for the path " + filePath, ioe);
+ if
(Path.of(properties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey())).toAbsolutePath().equals(configFile))
{
+ throw new IllegalStateException("File ingestor config file (" +
CONFIG_FILE_PATH_KEY
+ + ") must point to a different file than MiNiFi flow config
file (" + MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey() + ")");
}
}
- protected boolean targetChanged() {
- boolean targetChanged;
+ @Override
+ public void start() {
+ executorService = Executors.newScheduledThreadPool(1, runnable -> {
+ Thread notifierThread =
Executors.defaultThreadFactory().newThread(runnable);
+ notifierThread.setName("File Change Notifier Thread");
+ notifierThread.setDaemon(true);
+ return notifierThread;
+ });
+ executorService.scheduleWithFixedDelay(this, 0, pollingSeconds,
DEFAULT_POLLING_PERIOD_UNIT);
+ }
+
+ @Override
+ public void run() {
+ logger.debug("Checking for a change in {}", configFilePath);
+ if (targetFileChanged()) {
+ logger.debug("Target file changed, checking if it's different than
current flow");
+ try (FileInputStream flowCandidateInputStream = new
FileInputStream(configFilePath.toFile())) {
+ ByteBuffer newFlowConfig =
wrap(toByteArray(flowCandidateInputStream));
+ if (differentiator.isNew(newFlowConfig)) {
+ logger.debug("Current flow and new flow is different,
notifying listener");
+ configurationChangeNotifier.notifyListeners(newFlowConfig);
+ logger.debug("Listeners have been notified");
+ }
+ } catch (Exception e) {
+ logger.error("Could not successfully notify listeners.", e);
+ }
+ } else {
+ logger.debug("No change detected in {}", configFilePath);
+ }
+ }
- Optional<WatchKey> watchKey = Optional.ofNullable(watchService.poll());
+ @Override
+ public void close() {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ }
- targetChanged = watchKey
+ boolean targetFileChanged() {
+ logger.debug("Attempting to acquire watch key");
Review Comment:
Changed debug level to trace
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java:
##########
@@ -121,10 +122,18 @@ private void start() throws IOException,
InterruptedException {
Properties bootstrapProperties =
bootstrapFileProvider.getBootstrapProperties();
String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
- initConfigFiles(bootstrapProperties, confDir);
- Process process = startMiNiFi();
+ DEFAULT_LOGGER.debug("Generating minifi.properties from
bootstrap.conf");
Review Comment:
Done
##########
minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java:
##########
@@ -90,261 +89,233 @@ public class PullHttpChangeIngestor extends
AbstractPullChangeIngestor {
public static final String DIFFERENTIATOR_KEY = PULL_HTTP_BASE_KEY +
".differentiator";
public static final String USE_ETAG_KEY = PULL_HTTP_BASE_KEY + ".use.etag";
public static final String OVERRIDE_SECURITY = PULL_HTTP_BASE_KEY +
".override.security";
+ public static final String HTTP_HEADERS = PULL_HTTP_BASE_KEY + ".headers";
+
+ private static final Logger logger =
LoggerFactory.getLogger(PullHttpChangeIngestor.class);
+
+ private static final Map<String, Supplier<Differentiator<ByteBuffer>>>
DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(
+ WHOLE_CONFIG_KEY,
WholeConfigDifferentiator::getByteBufferDifferentiator
+ );
+ private static final int NOT_MODIFIED_STATUS_CODE = 304;
+ private static final String DEFAULT_CONNECT_TIMEOUT_MS = "5000";
+ private static final String DEFAULT_READ_TIMEOUT_MS = "15000";
+ private static final String DOUBLE_QUOTES = "\"";
+ private static final String ETAG_HEADER = "ETag";
+ private static final String PROXY_AUTHORIZATION_HEADER =
"Proxy-Authorization";
+ private static final String DEFAULT_PATH = "/";
+ private static final int BAD_REQUEST_STATUS_CODE = 400;
+ private static final String IF_NONE_MATCH_HEADER_KEY = "If-None-Match";
+ private static final String HTTP_HEADERS_SEPARATOR = ",";
+ private static final String HTTP_HEADER_KEY_VALUE_SEPARATOR = ":";
private final AtomicReference<OkHttpClient> httpClientReference = new
AtomicReference<>();
private final AtomicReference<Integer> portReference = new
AtomicReference<>();
private final AtomicReference<String> hostReference = new
AtomicReference<>();
private final AtomicReference<String> pathReference = new
AtomicReference<>();
private final AtomicReference<String> queryReference = new
AtomicReference<>();
+ private final AtomicReference<Map<String, String>> httpHeadersReference =
new AtomicReference<>();
+
private volatile Differentiator<ByteBuffer> differentiator;
private volatile String connectionScheme;
private volatile String lastEtag = "";
private volatile boolean useEtag = false;
- public PullHttpChangeIngestor() {
- logger = LoggerFactory.getLogger(PullHttpChangeIngestor.class);
- }
-
@Override
public void initialize(Properties properties, ConfigurationFileHolder
configurationFileHolder, ConfigurationChangeNotifier
configurationChangeNotifier) {
super.initialize(properties, configurationFileHolder,
configurationChangeNotifier);
-
pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY,
DEFAULT_POLLING_PERIOD)));
+
pollingPeriodMS.set(Integer.parseInt(properties.getProperty(PULL_HTTP_POLLING_PERIOD_KEY,
DEFAULT_POLLING_PERIOD_MILLISECONDS)));
if (pollingPeriodMS.get() < 1) {
- throw new IllegalArgumentException("Property, " +
PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a
positive integer.");
- }
-
- final String host = properties.getProperty(HOST_KEY);
- if (host == null || host.isEmpty()) {
- throw new IllegalArgumentException("Property, " + HOST_KEY + ",
for the hostname to pull configurations from must be specified.");
- }
-
- final String path = properties.getProperty(PATH_KEY, "/");
- final String query = properties.getProperty(QUERY_KEY, "");
-
- final String portString = (String) properties.get(PORT_KEY);
- final Integer port;
- if (portString == null) {
- throw new IllegalArgumentException("Property, " + PORT_KEY + ",
for the hostname to pull configurations from must be specified.");
- } else {
- port = Integer.parseInt(portString);
+ throw new IllegalArgumentException("Property, " +
PULL_HTTP_POLLING_PERIOD_KEY + ", for the polling period ms must be set with a
positive integer");
}
- portReference.set(port);
+ String host = ofNullable(properties.getProperty(HOST_KEY))
+ .filter(StringUtils::isNotBlank)
+ .orElseThrow(() -> new IllegalArgumentException("Property, " +
HOST_KEY + ", for the hostname to pull configurations from must be specified"));
+ String path = properties.getProperty(PATH_KEY, DEFAULT_PATH);
+ String query = properties.getProperty(QUERY_KEY, EMPTY);
+ Map<String, String> httpHeaders =
ofNullable(properties.getProperty(HTTP_HEADERS))
+ .filter(StringUtils::isNotBlank)
+ .map(headers -> headers.split(HTTP_HEADERS_SEPARATOR))
+ .map(Arrays::stream)
+ .orElseGet(Stream::of)
+ .map(String::trim)
+ .map(header -> header.split(HTTP_HEADER_KEY_VALUE_SEPARATOR))
+ .filter(split -> split.length == 2)
+ .collect(toMap(split ->
ofNullable(split[0]).map(String::trim).orElse(EMPTY), split ->
ofNullable(split[1]).map(String::trim).orElse(EMPTY)));
+ logger.debug("Configured HTTP headers: {}", httpHeaders);
+
+ ofNullable(properties.get(PORT_KEY))
+ .map(rawPort -> (String) rawPort)
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]