This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fd2de5a151 NIFI-12198 Add API and CLI commands to import reporting
task snapshots (#7875)
fd2de5a151 is described below
commit fd2de5a1515458ccc73df3e9a85fed9d6c4c3ce1
Author: Bryan Bende <[email protected]>
AuthorDate: Fri Oct 20 10:45:48 2023 -0400
NIFI-12198 Add API and CLI commands to import reporting task snapshots
(#7875)
* NIFI-12198 Add API and CLI commands to import reporting task snapshots
---
.../VersionedReportingTaskImportRequestEntity.java | 47 +++++++
...VersionedReportingTaskImportResponseEntity.java | 47 +++++++
.../java/org/apache/nifi/util/BundleUtils.java | 35 +++--
.../reporting/ReportingTaskProvider.java | 9 +-
.../org/apache/nifi/controller/FlowController.java | 9 +-
.../serialization/FlowSynchronizationUtils.java | 111 ++++++++++++++++
.../StandardVersionedReportingTaskImporter.java | 145 +++++++++++++++++++++
.../serialization/VersionedFlowSynchronizer.java | 84 ++----------
.../VersionedReportingTaskImportResult.java | 44 +++++++
.../VersionedReportingTaskImporter.java | 34 +++++
.../nifi/registry/flow/FlowRegistryUtils.java | 27 +++-
.../org/apache/nifi/web/NiFiServiceFacade.java | 25 ++++
.../apache/nifi/web/StandardNiFiServiceFacade.java | 72 ++++++++++
.../apache/nifi/web/api/ControllerResource.java | 120 +++++++++++++----
.../nifi/web/controller/ControllerFacade.java | 6 +
.../web/dao/impl/StandardReportingTaskDAO.java | 24 ++--
.../nifi/web/StandardNiFiServiceFacadeTest.java | 43 ++++++
.../cli/impl/client/nifi/ControllerClient.java | 5 +
.../client/nifi/impl/JerseyControllerClient.java | 19 +++
.../cli/impl/command/nifi/NiFiCommandGroup.java | 2 +
.../command/nifi/flow/ImportReportingTasks.java | 69 ++++++++++
.../result/nifi/ImportReportingTasksResult.java | 90 +++++++++++++
22 files changed, 928 insertions(+), 139 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportRequestEntity.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportRequestEntity.java
new file mode 100644
index 0000000000..0b8e1dc605
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportRequestEntity.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "versionedReportingTaskImportRequestEntity")
+public class VersionedReportingTaskImportRequestEntity extends Entity {
+
+ private VersionedReportingTaskSnapshot reportingTaskSnapshot;
+ private Boolean disconnectedNodeAcknowledged;
+
+ @ApiModelProperty("The snapshot to import")
+ public VersionedReportingTaskSnapshot getReportingTaskSnapshot() {
+ return reportingTaskSnapshot;
+ }
+
+ public void setReportingTaskSnapshot(VersionedReportingTaskSnapshot
reportingTaskSnapshot) {
+ this.reportingTaskSnapshot = reportingTaskSnapshot;
+ }
+
+ @ApiModelProperty("The disconnected node acknowledged flag")
+ public Boolean getDisconnectedNodeAcknowledged() {
+ return disconnectedNodeAcknowledged;
+ }
+
+ public void setDisconnectedNodeAcknowledged(Boolean
disconnectedNodeAcknowledged) {
+ this.disconnectedNodeAcknowledged = disconnectedNodeAcknowledged;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportResponseEntity.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportResponseEntity.java
new file mode 100644
index 0000000000..bf672a60c4
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedReportingTaskImportResponseEntity.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Set;
+
+@XmlRootElement(name = "versionedReportingTaskImportResponseEntity")
+public class VersionedReportingTaskImportResponseEntity extends Entity {
+
+ private Set<ReportingTaskEntity> reportingTasks;
+ private Set<ControllerServiceEntity> controllerServices;
+
+ @ApiModelProperty("The reporting tasks created by the import")
+ public Set<ReportingTaskEntity> getReportingTasks() {
+ return reportingTasks;
+ }
+
+ public void setReportingTasks(Set<ReportingTaskEntity> reportingTasks) {
+ this.reportingTasks = reportingTasks;
+ }
+
+ @ApiModelProperty("The controller services created by the import")
+ public Set<ControllerServiceEntity> getControllerServices() {
+ return controllerServices;
+ }
+
+ public void setControllerServices(Set<ControllerServiceEntity>
controllerServices) {
+ this.controllerServices = controllerServices;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
index d7ad003f62..80708b2a64 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
@@ -18,7 +18,9 @@ package org.apache.nifi.util;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.flow.VersionedConfigurableExtension;
import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.nar.PythonBundle;
@@ -216,23 +218,11 @@ public final class BundleUtils {
*/
public static void discoverCompatibleBundles(final ExtensionManager
extensionManager, final VersionedProcessGroup versionedGroup) {
if (versionedGroup.getProcessors() != null) {
- versionedGroup.getProcessors().forEach(processor -> {
- final BundleDTO dto = createBundleDto(processor.getBundle());
- final BundleCoordinate coordinate =
BundleUtils.getOptionalCompatibleBundle(extensionManager, processor.getType(),
dto).orElse(
- new BundleCoordinate(dto.getGroup(), dto.getArtifact(),
dto.getVersion()));
- processor.setBundle(createBundle(coordinate));
- });
+ versionedGroup.getProcessors().forEach(processor ->
discoverCompatibleBundle(extensionManager, processor));
}
if (versionedGroup.getControllerServices() != null) {
- versionedGroup.getControllerServices().forEach(controllerService
-> {
- final BundleDTO dto =
createBundleDto(controllerService.getBundle());
-
- final BundleCoordinate coordinate =
BundleUtils.getOptionalCompatibleBundle(extensionManager,
controllerService.getType(),
createBundleDto(controllerService.getBundle())).orElse(
- new BundleCoordinate(dto.getGroup(), dto.getArtifact(),
dto.getVersion()));
-
- controllerService.setBundle(createBundle(coordinate));
- });
+ versionedGroup.getControllerServices().forEach(controllerService
-> discoverCompatibleBundle(extensionManager, controllerService));
}
if (versionedGroup.getProcessGroups() != null) {
@@ -240,6 +230,23 @@ public final class BundleUtils {
}
}
+ public static void discoverCompatibleBundles(final ExtensionManager
extensionManager, final VersionedReportingTaskSnapshot reportingTaskSnapshot) {
+ if (reportingTaskSnapshot.getReportingTasks() != null) {
+ reportingTaskSnapshot.getReportingTasks().forEach(reportingTask ->
discoverCompatibleBundle(extensionManager, reportingTask));
+ }
+
+ if (reportingTaskSnapshot.getControllerServices() != null) {
+
reportingTaskSnapshot.getControllerServices().forEach(controllerService ->
discoverCompatibleBundle(extensionManager, controllerService));
+ }
+ }
+
+ public static void discoverCompatibleBundle(final ExtensionManager
extensionManager, final VersionedConfigurableExtension extension) {
+ final BundleDTO dto = createBundleDto(extension.getBundle());
+ final BundleCoordinate coordinate =
BundleUtils.getOptionalCompatibleBundle(extensionManager, extension.getType(),
dto).orElse(
+ new BundleCoordinate(dto.getGroup(), dto.getArtifact(),
dto.getVersion()));
+ extension.setBundle(createBundle(coordinate));
+ }
+
public static BundleCoordinate discoverCompatibleBundle(final
ExtensionManager extensionManager, final String type, final
org.apache.nifi.flow.Bundle bundle) {
return getCompatibleBundle(extensionManager, type,
createBundleDto(bundle));
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
index 4e310b7535..ea035e053c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
@@ -16,12 +16,12 @@
*/
package org.apache.nifi.controller.reporting;
-import java.util.Set;
-
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.nar.ExtensionManager;
+import java.util.Set;
+
/**
* A ReportingTaskProvider is responsible for providing management of, and
* access to, Reporting Tasks
@@ -41,11 +41,8 @@ public interface ReportingTaskProvider {
* being restored after a restart of the software
*
* @return the ReportingTaskNode that is used to manage the reporting task
- *
- * @throws ReportingTaskInstantiationException if unable to create the
- * Reporting Task
*/
- ReportingTaskNode createReportingTask(String type, String id,
BundleCoordinate bundleCoordinate, boolean firstTimeAdded) throws
ReportingTaskInstantiationException;
+ ReportingTaskNode createReportingTask(String type, String id,
BundleCoordinate bundleCoordinate, boolean firstTimeAdded);
/**
* @param identifier of node
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 7bb4eab8d5..a7ef827fa7 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -18,8 +18,6 @@ package org.apache.nifi.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
-import org.apache.nifi.flowanalysis.StandardFlowAnalyzer;
-import org.apache.nifi.flowanalysis.TriggerFlowAnalysisTask;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.authorization.Authorizer;
@@ -81,7 +79,6 @@ import
org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceSe
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceAuthorizer;
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
import
org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol;
-import
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
@@ -148,6 +145,8 @@ import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flowanalysis.FlowAnalysisRule;
+import org.apache.nifi.flowanalysis.StandardFlowAnalyzer;
+import org.apache.nifi.flowanalysis.TriggerFlowAnalysisTask;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.BundleUpdateStrategy;
@@ -180,9 +179,9 @@ import org.apache.nifi.python.DisabledPythonBridge;
import org.apache.nifi.python.PythonBridge;
import org.apache.nifi.python.PythonBridgeInitializationContext;
import org.apache.nifi.python.PythonProcessConfig;
+import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
-import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RemoteResourceManager;
@@ -2296,7 +2295,7 @@ public class FlowController implements
ReportingTaskProvider, FlowAnalysisRulePr
}
@Override
- public ReportingTaskNode createReportingTask(final String type, final
String id, final BundleCoordinate bundleCoordinate, final boolean
firstTimeAdded) throws ReportingTaskInstantiationException {
+ public ReportingTaskNode createReportingTask(final String type, final
String id, final BundleCoordinate bundleCoordinate, final boolean
firstTimeAdded) {
return flowManager.createReportingTask(type, id, bundleCoordinate,
firstTimeAdded);
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationUtils.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationUtils.java
new file mode 100644
index 0000000000..a090788818
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowSynchronizationUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.encrypt.EncryptionException;
+import org.apache.nifi.encrypt.PropertyEncryptor;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.VersionedConfigurableExtension;
+import org.apache.nifi.flow.VersionedPropertyDescriptor;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.util.BundleUtils;
+import org.apache.nifi.web.api.dto.BundleDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class FlowSynchronizationUtils {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FlowSynchronizationUtils.class);
+
+ private FlowSynchronizationUtils() {
+ }
+
+ static BundleCoordinate createBundleCoordinate(final ExtensionManager
extensionManager, final Bundle bundle, final String componentType) {
+ BundleCoordinate coordinate;
+ try {
+ final BundleDTO bundleDto = new BundleDTO(bundle.getGroup(),
bundle.getArtifact(), bundle.getVersion());
+ coordinate = BundleUtils.getCompatibleBundle(extensionManager,
componentType, bundleDto);
+ } catch (final IllegalStateException e) {
+ coordinate = new BundleCoordinate(bundle.getGroup(),
bundle.getArtifact(), bundle.getVersion());
+ }
+
+ return coordinate;
+ }
+
+ static Set<String> getSensitiveDynamicPropertyNames(final ComponentNode
componentNode, final VersionedConfigurableExtension extension) {
+ final Set<String> versionedSensitivePropertyNames = new
LinkedHashSet<>();
+
+ // Get Sensitive Property Names based on encrypted values including
both supported and dynamic properties
+ extension.getProperties()
+ .entrySet()
+ .stream()
+ .filter(entry -> isValueSensitive(entry.getValue()))
+ .map(Map.Entry::getKey)
+ .forEach(versionedSensitivePropertyNames::add);
+
+ // Get Sensitive Property Names based on supported and dynamic
property descriptors
+ extension.getPropertyDescriptors()
+ .values()
+ .stream()
+ .filter(VersionedPropertyDescriptor::isSensitive)
+ .map(VersionedPropertyDescriptor::getName)
+ .forEach(versionedSensitivePropertyNames::add);
+
+ // Filter combined Sensitive Property Names based on Component
Property Descriptor status
+ return versionedSensitivePropertyNames.stream()
+ .map(componentNode::getPropertyDescriptor)
+ .filter(PropertyDescriptor::isDynamic)
+ .map(PropertyDescriptor::getName)
+ .collect(Collectors.toSet());
+ }
+
+ static boolean isValueSensitive(final String value) {
+ return value != null && value.startsWith(FlowSerializer.ENC_PREFIX) &&
value.endsWith(FlowSerializer.ENC_SUFFIX);
+ }
+
+ static Map<String, String> decryptProperties(final Map<String, String>
encrypted, final PropertyEncryptor encryptor) {
+ final Map<String, String> decrypted = new HashMap<>(encrypted.size());
+ encrypted.forEach((key, value) -> decrypted.put(key, decrypt(value,
encryptor)));
+ return decrypted;
+ }
+
+ static String decrypt(final String value, final PropertyEncryptor
encryptor) {
+ if (isValueSensitive(value)) {
+ try {
+ return
encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(),
value.length() - FlowSerializer.ENC_SUFFIX.length()));
+ } catch (EncryptionException e) {
+ final String moreDescriptiveMessage = "There was a problem
decrypting a sensitive flow configuration value. " +
+ "Check that the nifi.sensitive.props.key value in
nifi.properties matches the value used to encrypt the flow.json.gz file";
+ logger.error(moreDescriptiveMessage, e);
+ throw new EncryptionException(moreDescriptiveMessage, e);
+ }
+ } else {
+ return value;
+ }
+ }
+
+
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardVersionedReportingTaskImporter.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardVersionedReportingTaskImporter.java
new file mode 100644
index 0000000000..d48d77d856
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardVersionedReportingTaskImporter.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.encrypt.PropertyEncryptor;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static
org.apache.nifi.controller.serialization.FlowSynchronizationUtils.createBundleCoordinate;
+import static
org.apache.nifi.controller.serialization.FlowSynchronizationUtils.decryptProperties;
+import static
org.apache.nifi.controller.serialization.FlowSynchronizationUtils.getSensitiveDynamicPropertyNames;
+
+public class StandardVersionedReportingTaskImporter implements
VersionedReportingTaskImporter {
+
+ final FlowController flowController;
+
+ public StandardVersionedReportingTaskImporter(final FlowController
flowController) {
+ this.flowController = flowController;
+ }
+
+ @Override
+ public VersionedReportingTaskImportResult importSnapshot(final
VersionedReportingTaskSnapshot reportingTaskSnapshot) {
+ final List<VersionedControllerService> controllerServices =
Optional.ofNullable(reportingTaskSnapshot.getControllerServices()).orElse(Collections.emptyList());
+ final Set<ControllerServiceNode> controllerServiceNodes =
importControllerServices(controllerServices);
+
+ final List<VersionedReportingTask> reportingTasks =
Optional.ofNullable(reportingTaskSnapshot.getReportingTasks()).orElse(Collections.emptyList());
+ final Set<ReportingTaskNode> reportingTaskNodes =
importReportingTasks(reportingTasks);
+
+ return new VersionedReportingTaskImportResult(reportingTaskNodes,
controllerServiceNodes);
+ }
+
+ private Set<ReportingTaskNode> importReportingTasks(final
List<VersionedReportingTask> reportingTasks) {
+ final Set<ReportingTaskNode> taskNodes = new HashSet<>();
+ for (final VersionedReportingTask reportingTask : reportingTasks) {
+ final ReportingTaskNode taskNode = addReportingTask(reportingTask);
+ taskNodes.add(taskNode);
+ }
+ return taskNodes;
+ }
+
+ private ReportingTaskNode addReportingTask(final VersionedReportingTask
reportingTask) {
+ final ExtensionManager extensionManager =
flowController.getExtensionManager();
+ final BundleCoordinate coordinate =
createBundleCoordinate(extensionManager, reportingTask.getBundle(),
reportingTask.getType());
+
+ final ReportingTaskNode taskNode =
flowController.createReportingTask(reportingTask.getType(),
reportingTask.getInstanceIdentifier(), coordinate, false);
+ taskNode.setName(reportingTask.getName());
+ taskNode.setComments(reportingTask.getComments());
+ taskNode.setSchedulingPeriod(reportingTask.getSchedulingPeriod());
+
taskNode.setSchedulingStrategy(SchedulingStrategy.valueOf(reportingTask.getSchedulingStrategy()));
+ taskNode.setAnnotationData(reportingTask.getAnnotationData());
+
+ final Set<String> sensitiveDynamicPropertyNames =
getSensitiveDynamicPropertyNames(taskNode, reportingTask);
+ final Map<String, String> decryptedProperties =
decryptProperties(reportingTask.getProperties(), flowController.getEncryptor());
+ taskNode.setProperties(decryptedProperties, false,
sensitiveDynamicPropertyNames);
+ return taskNode;
+ }
+
+ private Set<ControllerServiceNode> importControllerServices(final
List<VersionedControllerService> controllerServices) {
+ final Set<ControllerServiceNode> controllerServicesAdded = new
HashSet<>();
+ for (final VersionedControllerService versionedControllerService :
controllerServices) {
+ final ControllerServiceNode serviceNode =
flowController.getFlowManager().getRootControllerService(versionedControllerService.getInstanceIdentifier());
+ if (serviceNode == null) {
+ final ControllerServiceNode added =
addRootControllerService(versionedControllerService);
+ controllerServicesAdded.add(added);
+ }
+ }
+
+ for (final VersionedControllerService versionedControllerService :
controllerServices) {
+ final ControllerServiceNode serviceNode =
flowController.getFlowManager().getRootControllerService(versionedControllerService.getInstanceIdentifier());
+ if (controllerServicesAdded.contains(serviceNode)) {
+ updateRootControllerService(serviceNode,
versionedControllerService, flowController.getEncryptor());
+ }
+ }
+
+ return controllerServicesAdded;
+ }
+
+ private ControllerServiceNode addRootControllerService(final
VersionedControllerService controllerService) {
+ final FlowManager flowManager = flowController.getFlowManager();
+ final ExtensionManager extensionManager =
flowController.getExtensionManager();
+
+ final BundleCoordinate bundleCoordinate =
createBundleCoordinate(extensionManager, controllerService.getBundle(),
controllerService.getType());
+ final ControllerServiceNode serviceNode =
flowManager.createControllerService(controllerService.getType(),
controllerService.getInstanceIdentifier(),
+ bundleCoordinate, Collections.emptySet(), true, true, null);
+ serviceNode.setVersionedComponentId(controllerService.getIdentifier());
+ flowManager.addRootControllerService(serviceNode);
+
+ return serviceNode;
+ }
+
+ private void updateRootControllerService(final ControllerServiceNode
serviceNode, final VersionedControllerService controllerService,
+ final PropertyEncryptor
encryptor) {
+ serviceNode.pauseValidationTrigger();
+ try {
+ serviceNode.setName(controllerService.getName());
+
serviceNode.setAnnotationData(controllerService.getAnnotationData());
+ serviceNode.setComments(controllerService.getComments());
+
+ if (controllerService.getBulletinLevel() != null) {
+
serviceNode.setBulletinLevel(LogLevel.valueOf(controllerService.getBulletinLevel()));
+ } else {
+ // this situation exists for backward compatibility with nifi
1.16 and earlier where controller services do not have bulletinLevels set in
flow.xml/flow.json
+ // and bulletinLevels are at the WARN level by default
+ serviceNode.setBulletinLevel(LogLevel.WARN);
+ }
+
+ final Set<String> sensitiveDynamicPropertyNames =
getSensitiveDynamicPropertyNames(serviceNode, controllerService);
+ final Map<String, String> decryptedProperties =
decryptProperties(controllerService.getProperties(), encryptor);
+ serviceNode.setProperties(decryptedProperties, false,
sensitiveDynamicPropertyNames);
+ } finally {
+ serviceNode.resumeValidationTrigger();
+ }
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 5db7190bf1..18341d5317 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -24,7 +24,6 @@ import org.apache.nifi.authorization.ManagedAuthorizer;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.AbstractComponentNode;
@@ -51,13 +50,11 @@ import
org.apache.nifi.controller.inheritance.MissingComponentsCheck;
import
org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.StandardConfigurationContext;
-import org.apache.nifi.encrypt.EncryptionException;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ExecutionEngine;
import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedComponent;
-import org.apache.nifi.flow.VersionedConfigurableExtension;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedFlowAnalysisRule;
@@ -67,7 +64,6 @@ import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterProvider;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
-import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.groups.AbstractComponentScheduler;
import org.apache.nifi.groups.BundleUpdateStrategy;
@@ -117,7 +113,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -128,6 +123,11 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
+import static
org.apache.nifi.controller.serialization.FlowSynchronizationUtils.createBundleCoordinate;
+import static
org.apache.nifi.controller.serialization.FlowSynchronizationUtils.decrypt;
+import static
org.apache.nifi.controller.serialization.FlowSynchronizationUtils.decryptProperties;
+import static
org.apache.nifi.controller.serialization.FlowSynchronizationUtils.getSensitiveDynamicPropertyNames;
+
public class VersionedFlowSynchronizer implements FlowSynchronizer {
private static final Logger logger =
LoggerFactory.getLogger(VersionedFlowSynchronizer.class);
/**
@@ -575,7 +575,7 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
return;
}
- final BundleCoordinate coordinate =
createBundleCoordinate(versionedFlowRegistryClient.getBundle(),
versionedFlowRegistryClient.getType());
+ final BundleCoordinate coordinate =
createBundleCoordinate(extensionManager,
versionedFlowRegistryClient.getBundle(), versionedFlowRegistryClient.getType());
final FlowRegistryClientNode flowRegistryClient =
flowController.getFlowManager().createFlowRegistryClient(
versionedFlowRegistryClient.getType(),
versionedFlowRegistryClient.getIdentifier(), coordinate, Collections.emptySet()
, false, true, null);
@@ -638,7 +638,7 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
}
private void addReportingTask(final FlowController controller, final
VersionedReportingTask reportingTask) throws
ReportingTaskInstantiationException {
- final BundleCoordinate coordinate =
createBundleCoordinate(reportingTask.getBundle(), reportingTask.getType());
+ final BundleCoordinate coordinate =
createBundleCoordinate(extensionManager, reportingTask.getBundle(),
reportingTask.getType());
final ReportingTaskNode taskNode =
controller.createReportingTask(reportingTask.getType(),
reportingTask.getInstanceIdentifier(), coordinate, false);
updateReportingTask(taskNode, reportingTask, controller);
@@ -706,7 +706,7 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
}
private void addFlowAnalysisRule(final FlowController controller, final
VersionedFlowAnalysisRule flowAnalysisRule) throws
FlowAnalysisRuleInstantiationException {
- final BundleCoordinate coordinate =
createBundleCoordinate(flowAnalysisRule.getBundle(),
flowAnalysisRule.getType());
+ final BundleCoordinate coordinate =
createBundleCoordinate(extensionManager, flowAnalysisRule.getBundle(),
flowAnalysisRule.getType());
final FlowAnalysisRuleNode ruleNode =
controller.createFlowAnalysisRule(flowAnalysisRule.getType(),
flowAnalysisRule.getInstanceIdentifier(), coordinate, false);
updateFlowAnalysisRule(ruleNode, flowAnalysisRule, controller);
@@ -752,7 +752,7 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
}
private void addParameterProvider(final FlowController controller, final
VersionedParameterProvider parameterProvider, final PropertyEncryptor
encryptor) {
- final BundleCoordinate coordinate =
createBundleCoordinate(parameterProvider.getBundle(),
parameterProvider.getType());
+ final BundleCoordinate coordinate =
createBundleCoordinate(extensionManager, parameterProvider.getBundle(),
parameterProvider.getType());
final ParameterProviderNode parameterProviderNode =
controller.getFlowManager()
.createParameterProvider(parameterProvider.getType(),
parameterProvider.getInstanceIdentifier(), coordinate, false);
@@ -987,7 +987,7 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
}
private ControllerServiceNode addRootControllerService(final
FlowController controller, final VersionedControllerService
versionedControllerService) {
- final BundleCoordinate bundleCoordinate =
createBundleCoordinate(versionedControllerService.getBundle(),
versionedControllerService.getType());
+ final BundleCoordinate bundleCoordinate =
createBundleCoordinate(extensionManager,
versionedControllerService.getBundle(), versionedControllerService.getType());
final ControllerServiceNode serviceNode =
controller.getFlowManager().createControllerService(versionedControllerService.getType(),
versionedControllerService.getInstanceIdentifier(),
bundleCoordinate,Collections.emptySet(), true, true, null);
@@ -1019,70 +1019,6 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
}
}
- private Set<String> getSensitiveDynamicPropertyNames(final ComponentNode
componentNode, final VersionedConfigurableExtension extension) {
- final Set<String> versionedSensitivePropertyNames = new
LinkedHashSet<>();
-
- // Get Sensitive Property Names based on encrypted values including
both supported and dynamic properties
- extension.getProperties()
- .entrySet()
- .stream()
- .filter(entry -> isValueSensitive(entry.getValue()))
- .map(Map.Entry::getKey)
- .forEach(versionedSensitivePropertyNames::add);
-
- // Get Sensitive Property Names based on supported and dynamic
property descriptors
- extension.getPropertyDescriptors()
- .values()
- .stream()
- .filter(VersionedPropertyDescriptor::isSensitive)
- .map(VersionedPropertyDescriptor::getName)
- .forEach(versionedSensitivePropertyNames::add);
-
- // Filter combined Sensitive Property Names based on Component
Property Descriptor status
- return versionedSensitivePropertyNames.stream()
- .map(componentNode::getPropertyDescriptor)
- .filter(PropertyDescriptor::isDynamic)
- .map(PropertyDescriptor::getName)
- .collect(Collectors.toSet());
- }
-
- private Map<String, String> decryptProperties(final Map<String, String>
encrypted, final PropertyEncryptor encryptor) {
- final Map<String, String> decrypted = new HashMap<>(encrypted.size());
- encrypted.forEach((key, value) -> decrypted.put(key, decrypt(value,
encryptor)));
- return decrypted;
- }
-
- private String decrypt(final String value, final PropertyEncryptor
encryptor) {
- if (isValueSensitive(value)) {
- try {
- return
encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(),
value.length() - FlowSerializer.ENC_SUFFIX.length()));
- } catch (EncryptionException e) {
- final String moreDescriptiveMessage = "There was a problem
decrypting a sensitive flow configuration value. " +
- "Check that the nifi.sensitive.props.key value in
nifi.properties matches the value used to encrypt the flow.json.gz file";
- logger.error(moreDescriptiveMessage, e);
- throw new EncryptionException(moreDescriptiveMessage, e);
- }
- } else {
- return value;
- }
- }
-
- private boolean isValueSensitive(final String value) {
- return value != null && value.startsWith(FlowSerializer.ENC_PREFIX) &&
value.endsWith(FlowSerializer.ENC_SUFFIX);
- }
-
- private BundleCoordinate createBundleCoordinate(final Bundle bundle, final
String componentType) {
- BundleCoordinate coordinate;
- try {
- final BundleDTO bundleDto = new BundleDTO(bundle.getGroup(),
bundle.getArtifact(), bundle.getVersion());
- coordinate = BundleUtils.getCompatibleBundle(extensionManager,
componentType, bundleDto);
- } catch (final IllegalStateException e) {
- coordinate = new BundleCoordinate(bundle.getGroup(),
bundle.getArtifact(), bundle.getVersion());
- }
-
- return coordinate;
- }
-
private void inheritAuthorizations(final DataFlow existingFlow, final
DataFlow proposedFlow, final FlowController controller) {
final Authorizer authorizer = controller.getAuthorizer();
if (!(authorizer instanceof ManagedAuthorizer)) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImportResult.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImportResult.java
new file mode 100644
index 0000000000..7c3fa40247
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImportResult.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class VersionedReportingTaskImportResult {
+
+ private final Set<ReportingTaskNode> reportingTaskNodes;
+ private final Set<ControllerServiceNode> controllerServiceNodes;
+
+ public VersionedReportingTaskImportResult(final Set<ReportingTaskNode>
reportingTaskNodes,
+ final Set<ControllerServiceNode>
controllerServiceNodes) {
+ this.reportingTaskNodes =
Collections.unmodifiableSet(reportingTaskNodes == null ? Collections.emptySet()
: reportingTaskNodes);
+ this.controllerServiceNodes =
Collections.unmodifiableSet(controllerServiceNodes == null ?
Collections.emptySet() : controllerServiceNodes);
+ }
+
+ public Set<ReportingTaskNode> getReportingTaskNodes() {
+ return reportingTaskNodes;
+ }
+
+ public Set<ControllerServiceNode> getControllerServiceNodes() {
+ return controllerServiceNodes;
+ }
+}
+
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImporter.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImporter.java
new file mode 100644
index 0000000000..0587d732f0
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedReportingTaskImporter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.serialization;
+
+import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
+
+/**
+ * Encapsulates logic for importing reporting tasks and controller services.
+ */
+public interface VersionedReportingTaskImporter {
+
+ /**
+ * Imports the given snapshot. A new instance of each reporting task and
controller service will be created.
+ *
+ * @param reportingTaskSnapshot the snapshot
+ * @return the result of the import containing any create reporting tasks
and controller services
+ */
+ VersionedReportingTaskImportResult
importSnapshot(VersionedReportingTaskSnapshot reportingTaskSnapshot);
+
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java
index 0021f5755e..c64cb84c9c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryUtils.java
@@ -22,20 +22,33 @@ import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.BundleDTO;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
public class FlowRegistryUtils {
public static Set<ConfigurableComponent> getRestrictedComponents(final
VersionedProcessGroup group, final NiFiServiceFacade serviceFacade) {
- final Set<ConfigurableComponent> restrictedComponents = new
HashSet<>();
-
final Set<Tuple<String, BundleCoordinate>> componentTypes = new
HashSet<>();
populateComponentTypes(group, componentTypes);
+ return getRestrictedComponents(serviceFacade, componentTypes);
+ }
+
+ public static Set<ConfigurableComponent> getRestrictedComponents(final
VersionedReportingTaskSnapshot reportingTaskSnapshot, final NiFiServiceFacade
serviceFacade) {
+ final Set<Tuple<String, BundleCoordinate>> componentTypes = new
HashSet<>();
+ populateComponentTypes(reportingTaskSnapshot, componentTypes);
+ return getRestrictedComponents(serviceFacade, componentTypes);
+
+ }
+
+ private static Set<ConfigurableComponent>
getRestrictedComponents(NiFiServiceFacade serviceFacade, Set<Tuple<String,
BundleCoordinate>> componentTypes) {
+ final Set<ConfigurableComponent> restrictedComponents = new
HashSet<>();
for (final Tuple<String, BundleCoordinate> tuple : componentTypes) {
final ConfigurableComponent component =
serviceFacade.getTempComponent(tuple.getKey(), tuple.getValue());
@@ -66,6 +79,16 @@ public class FlowRegistryUtils {
}
}
+ private static void populateComponentTypes(final
VersionedReportingTaskSnapshot reportingTaskSnapshot, final Set<Tuple<String,
BundleCoordinate>> componentTypes) {
+
Optional.ofNullable(reportingTaskSnapshot.getReportingTasks()).orElse(Collections.emptyList()).stream()
+ .map(versionedReportingTask -> new
Tuple<>(versionedReportingTask.getType(),
createBundleCoordinate(versionedReportingTask.getBundle())))
+ .forEach(componentTypes::add);
+
+
Optional.ofNullable(reportingTaskSnapshot.getControllerServices()).orElse(Collections.emptyList()).stream()
+ .map(versionedSvc -> new Tuple<>(versionedSvc.getType(),
createBundleCoordinate(versionedSvc.getBundle())))
+ .forEach(componentTypes::add);
+ }
+
public static BundleCoordinate createBundleCoordinate(final Bundle bundle)
{
return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(),
bundle.getVersion());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 4904fdef4d..5dfa1a15af 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -137,6 +137,7 @@ import
org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
+import
org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity;
import org.apache.nifi.web.api.request.FlowMetricsRegistry;
import java.util.Collection;
@@ -145,6 +146,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Supplier;
/**
* Defines the NiFiServiceFacade interface.
@@ -333,6 +335,22 @@ public interface NiFiServiceFacade {
*/
VersionedReportingTaskSnapshot getVersionedReportingTaskSnapshot();
+ /**
+ * Generates unique identifiers for all components in the snapshot so that
multiple imports of the same snapshot
+ * create multiple components.
+ *
+ * @param reportingTaskSnapshot the snapshot
+ */
+ void generateIdentifiersForImport(VersionedReportingTaskSnapshot
reportingTaskSnapshot, Supplier<String> idGenerator);
+
+ /**
+ * Imports the reporting tasks and controller services in the given
snapshot.
+ *
+ * @param reportingTaskSnapshot the snapshot to import
+ * @return the response entity
+ */
+ VersionedReportingTaskImportResponseEntity
importReportingTasks(VersionedReportingTaskSnapshot reportingTaskSnapshot);
+
/**
* Gets the controller level bulletins.
*
@@ -2596,6 +2614,13 @@ public interface NiFiServiceFacade {
*/
void discoverCompatibleBundles(VersionedProcessGroup versionedGroup);
+ /**
+ * Discovers the compatible bundle details for the components in the
specified snapshot and updates the snapshot to reflect the appropriate bundles.
+ *
+ * @param reportingTaskSnapshot the snapshot
+ */
+ void discoverCompatibleBundles(VersionedReportingTaskSnapshot
reportingTaskSnapshot);
+
/**
* For any Controller Service that is found in the given Versioned Process
Group, if that Controller Service is not itself included in the Versioned
Process Groups,
* attempts to find an existing Controller Service that matches the
definition. If any is found, the component within the Versioned Process Group
is updated to point
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 774265236f..6ac784df7f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -91,6 +91,8 @@ import
org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.claim.ContentDirection;
+import
org.apache.nifi.controller.serialization.VersionedReportingTaskImportResult;
+import org.apache.nifi.controller.serialization.VersionedReportingTaskImporter;
import
org.apache.nifi.controller.serialization.VersionedReportingTaskSnapshotMapper;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -109,6 +111,7 @@ import org.apache.nifi.flow.ExecutionEngine;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ParameterProviderReference;
import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConfigurableExtension;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
@@ -116,6 +119,8 @@ import org.apache.nifi.flow.VersionedExternalFlowMetadata;
import org.apache.nifi.flow.VersionedFlowCoordinates;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedPropertyDescriptor;
+import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
@@ -332,6 +337,7 @@ import
org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity;
import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
import org.apache.nifi.web.api.entity.VersionedFlowEntity;
import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
+import
org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity;
import org.apache.nifi.web.api.request.FlowMetricsRegistry;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.AccessPolicyDAO;
@@ -3983,6 +3989,11 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(),
versionedGroup);
}
+ @Override
+ public void discoverCompatibleBundles(final VersionedReportingTaskSnapshot
reportingTaskSnapshot) {
+
BundleUtils.discoverCompatibleBundles(controllerFacade.getExtensionManager(),
reportingTaskSnapshot);
+ }
+
@Override
public void resolveParameterProviders(final RegisteredFlowSnapshot
versionedFlowSnapshot, final NiFiUser user) {
final Map<String, ParameterProviderReference>
parameterProviderReferences = versionedFlowSnapshot.getParameterProviders();
@@ -4200,6 +4211,67 @@ public class StandardNiFiServiceFacade implements
NiFiServiceFacade {
});
}
+ @Override
+ public void generateIdentifiersForImport(final
VersionedReportingTaskSnapshot reportingTaskSnapshot, final Supplier<String>
idGenerator) {
+ final List<VersionedReportingTask> reportingTasks =
Optional.ofNullable(reportingTaskSnapshot.getReportingTasks()).orElse(Collections.emptyList());
+ final List<VersionedControllerService> controllerServices =
Optional.ofNullable(reportingTaskSnapshot.getControllerServices()).orElse(Collections.emptyList());
+
+ // First generate all new ids and instance ids for each component,
maintaining a mapping from old id to new instance id, the instance id
+ // will be used to create the component, so we want to update any CS
references to use the instance id
+ final Map<String, String> oldIdToNewInstanceIdMap = new HashMap<>();
+ controllerServices.forEach(controllerService ->
generateIdentifiersForImport(controllerService, idGenerator,
oldIdToNewInstanceIdMap));
+ reportingTasks.forEach(reportingTask ->
generateIdentifiersForImport(reportingTask, idGenerator,
oldIdToNewInstanceIdMap));
+
+ // Now go back through all components and update any property values
that referenced old CS ids
+ controllerServices.forEach(controllerService ->
updateControllerServiceReferences(controllerService, oldIdToNewInstanceIdMap));
+ reportingTasks.forEach(reportingTask ->
updateControllerServiceReferences(reportingTask, oldIdToNewInstanceIdMap));
+ }
+
+ private void generateIdentifiersForImport(final
VersionedConfigurableExtension extension, final Supplier<String> idGenerator,
+ final Map<String, String>
oldIdToNewIdMap) {
+ final String identifier = idGenerator.get();
+ final String instanceIdentifier = idGenerator.get();
+ oldIdToNewIdMap.put(extension.getIdentifier(), instanceIdentifier);
+ extension.setIdentifier(identifier);
+ extension.setInstanceIdentifier(instanceIdentifier);
+ }
+
+ private void updateControllerServiceReferences(final
VersionedConfigurableExtension extension, final Map<String, String>
oldIdToNewIdMap) {
+ final Map<String, String> propertyValues =
Optional.ofNullable(extension.getProperties()).orElse(Collections.emptyMap());
+ final Map<String, VersionedPropertyDescriptor> propertyDescriptors =
Optional.ofNullable(extension.getPropertyDescriptors()).orElse(Collections.emptyMap());
+
+ propertyDescriptors.forEach((propName, propDescriptor) -> {
+ if (propDescriptor.getIdentifiesControllerService()) {
+ final String oldServiceId = propertyValues.get(propName);
+ if (oldServiceId != null) {
+ final String newServiceId =
oldIdToNewIdMap.get(oldServiceId);
+ propertyValues.put(propName, newServiceId);
+ }
+ }
+ });
+ }
+
+ @Override
+ public VersionedReportingTaskImportResponseEntity
importReportingTasks(final VersionedReportingTaskSnapshot
reportingTaskSnapshot) {
+ final VersionedReportingTaskImporter reportingTaskImporter =
controllerFacade.createReportingTaskImporter();
+ final VersionedReportingTaskImportResult importResult =
reportingTaskImporter.importSnapshot(reportingTaskSnapshot);
+
+ controllerFacade.save();
+
+ final Set<ReportingTaskEntity> reportingTaskEntities =
importResult.getReportingTaskNodes().stream()
+ .map(this::createReportingTaskEntity)
+ .collect(Collectors.toSet());
+
+ final Set<ControllerServiceEntity> controllerServiceEntities =
importResult.getControllerServiceNodes().stream()
+ .map(serviceNode -> createControllerServiceEntity(serviceNode,
false))
+ .collect(Collectors.toSet());
+
+ final VersionedReportingTaskImportResponseEntity importResponseEntity
= new VersionedReportingTaskImportResponseEntity();
+ importResponseEntity.setReportingTasks(reportingTaskEntities);
+ importResponseEntity.setControllerServices(controllerServiceEntities);
+ return importResponseEntity;
+ }
+
@Override
public ControllerBulletinsEntity getControllerBulletins() {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
index bf888660b1..73846c8e9c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java
@@ -24,30 +24,6 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SwaggerDefinition;
import io.swagger.annotations.Tag;
-import java.net.URI;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
import org.apache.nifi.authorization.Authorizer;
@@ -56,7 +32,10 @@ import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
+import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
+import org.apache.nifi.registry.flow.FlowRegistryUtils;
import org.apache.nifi.web.IllegalClusterResourceRequestException;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.ResourceNotFoundException;
@@ -101,12 +80,39 @@ import
org.apache.nifi.web.api.entity.ParameterProviderEntity;
import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.api.entity.VerifyConfigRequestEntity;
+import
org.apache.nifi.web.api.entity.VersionedReportingTaskImportRequestEntity;
+import
org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.DateTimeParameter;
import org.apache.nifi.web.api.request.LongParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
/**
* RESTful endpoint for managing a Flow Controller.
*/
@@ -449,6 +455,72 @@ public class ControllerResource extends
ApplicationResource {
);
}
+ /**
+ * Imports a reporting task snapshot.
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("reporting-tasks/import")
+ @ApiOperation(
+ value = "Imports a reporting task snapshot",
+ response = VersionedReportingTaskImportResponseEntity.class,
+ authorizations = {
+ @Authorization(value = "Write - /controller")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to
complete the request because it was invalid. The request should not be retried
without modification."),
+ @ApiResponse(code = 401, message = "Client could not be
authenticated."),
+ @ApiResponse(code = 403, message = "Client is not
authorized to make this request."),
+ @ApiResponse(code = 409, message = "The request was valid
but NiFi was not in the appropriate state to process it. Retrying the same
request later may be successful.")
+ }
+ )
+ public Response importReportingTaskSnapshot(
+ @Context final HttpServletRequest httpServletRequest,
+ @ApiParam(
+ value = "The import request containing the reporting task
snapshot to import.",
+ required = true
+ ) final VersionedReportingTaskImportRequestEntity
importRequestEntity) {
+
+ if (importRequestEntity == null ||
importRequestEntity.getReportingTaskSnapshot() == null) {
+ throw new IllegalArgumentException("Reporting task snapshot is
required");
+ }
+
+ final VersionedReportingTaskSnapshot requestSnapshot =
importRequestEntity.getReportingTaskSnapshot();
+ serviceFacade.discoverCompatibleBundles(requestSnapshot);
+ serviceFacade.generateIdentifiersForImport(requestSnapshot, () ->
generateUuid());
+
+ if (isReplicateRequest()) {
+ return replicate(HttpMethod.POST, importRequestEntity);
+ } else if (isDisconnectedFromCluster()) {
+
verifyDisconnectedNodeModification(importRequestEntity.getDisconnectedNodeAcknowledged());
+ }
+
+ return withWriteLock(
+ serviceFacade,
+ importRequestEntity,
+ lookup -> {
+ authorizeController(RequestAction.WRITE);
+
+ final Set<ConfigurableComponent> restrictedComponents =
FlowRegistryUtils.getRestrictedComponents(requestSnapshot, serviceFacade);
+ restrictedComponents.forEach(restrictedComponent -> {
+ final ComponentAuthorizable
restrictedComponentAuthorizable =
lookup.getConfigurableComponent(restrictedComponent);
+ authorizeRestrictions(authorizer,
restrictedComponentAuthorizable);
+ });
+ },
+ () -> {
+ // Nothing to verify
+ },
+ (importRequest) -> {
+ final VersionedReportingTaskSnapshot snapshot =
importRequest.getReportingTaskSnapshot();
+ final VersionedReportingTaskImportResponseEntity
responseEntity = serviceFacade.importReportingTasks(snapshot);
+ return generateOkResponse(responseEntity).build();
+ }
+ );
+ }
+
// -------------------
// flow-analysis-rules
// -------------------
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 1b1dc3e1b8..754b38cb04 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -51,6 +51,8 @@ import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.claim.ContentDirection;
+import
org.apache.nifi.controller.serialization.StandardVersionedReportingTaskImporter;
+import org.apache.nifi.controller.serialization.VersionedReportingTaskImporter;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.ControllerServiceResolver;
@@ -1760,6 +1762,10 @@ public class ControllerFacade implements Authorizable {
return
flowController.getEventAccess().getFlowFileRepositoryStorageUsage();
}
+ public VersionedReportingTaskImporter createReportingTaskImporter() {
+ return new StandardVersionedReportingTaskImporter(flowController);
+ }
+
/*
* setters
*/
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
index f3ff093cb6..1de941b52e 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardReportingTaskDAO.java
@@ -93,23 +93,19 @@ public class StandardReportingTaskDAO extends ComponentDAO
implements ReportingT
throw new IllegalArgumentException("The reporting task type must
be specified.");
}
- try {
- // create the reporting task
- final ExtensionManager extensionManager =
reportingTaskProvider.getExtensionManager();
- final BundleCoordinate bundleCoordinate =
BundleUtils.getBundle(extensionManager, reportingTaskDTO.getType(),
reportingTaskDTO.getBundle());
- final ReportingTaskNode reportingTask =
reportingTaskProvider.createReportingTask(
- reportingTaskDTO.getType(), reportingTaskDTO.getId(),
bundleCoordinate, true);
+ // create the reporting task
+ final ExtensionManager extensionManager =
reportingTaskProvider.getExtensionManager();
+ final BundleCoordinate bundleCoordinate =
BundleUtils.getBundle(extensionManager, reportingTaskDTO.getType(),
reportingTaskDTO.getBundle());
+ final ReportingTaskNode reportingTask =
reportingTaskProvider.createReportingTask(
+ reportingTaskDTO.getType(), reportingTaskDTO.getId(),
bundleCoordinate, true);
- // ensure we can perform the update
- verifyUpdate(reportingTask, reportingTaskDTO);
+ // ensure we can perform the update
+ verifyUpdate(reportingTask, reportingTaskDTO);
- // perform the update
- configureReportingTask(reportingTask, reportingTaskDTO);
+ // perform the update
+ configureReportingTask(reportingTask, reportingTaskDTO);
- return reportingTask;
- } catch (ReportingTaskInstantiationException rtie) {
- throw new NiFiCoreException(rtie.getMessage(), rtie);
- }
+ return reportingTask;
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
index 07ea707b08..01dd931258 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
@@ -44,6 +44,9 @@ import
org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.ParameterProviderReference;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedPropertyDescriptor;
+import org.apache.nifi.flow.VersionedReportingTask;
+import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
import org.apache.nifi.flowanalysis.EnforcementPolicy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
@@ -107,6 +110,7 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -942,4 +946,43 @@ public class StandardNiFiServiceFacadeTest {
return processGroup;
}
+
+ @Test
+ public void testGenerateIdsForImportingReportingTaskSnapshot() {
+ final String originalServiceId = "s1";
+ final VersionedControllerService service = new
VersionedControllerService();
+ service.setIdentifier(originalServiceId);
+
+ final VersionedPropertyDescriptor serviceDescriptor = new
VersionedPropertyDescriptor();
+ serviceDescriptor.setName("My Service");
+ serviceDescriptor.setIdentifiesControllerService(true);
+
+ final Map<String, VersionedPropertyDescriptor>
reportingTaskDescriptors = new HashMap<>();
+ reportingTaskDescriptors.put(serviceDescriptor.getName(),
serviceDescriptor);
+
+ final Map<String, String> reportingTaskPropertyValues = new
HashMap<>();
+ reportingTaskPropertyValues.put(serviceDescriptor.getName(),
service.getIdentifier());
+
+ final String originalReportingTaskId = "r1";
+ final VersionedReportingTask reportingTask = new
VersionedReportingTask();
+ reportingTask.setIdentifier(originalReportingTaskId);
+ reportingTask.setPropertyDescriptors(reportingTaskDescriptors);
+ reportingTask.setProperties(reportingTaskPropertyValues);
+
+ final VersionedReportingTaskSnapshot reportingTaskSnapshot = new
VersionedReportingTaskSnapshot();
+
reportingTaskSnapshot.setReportingTasks(Collections.singletonList(reportingTask));
+
reportingTaskSnapshot.setControllerServices(Collections.singletonList(service));
+
+ serviceFacade.generateIdentifiersForImport(reportingTaskSnapshot, ()
-> UUID.randomUUID().toString());
+
+ assertNotNull(service.getIdentifier());
+ assertNotNull(service.getInstanceIdentifier());
+ assertNotEquals(originalServiceId, service.getIdentifier());
+
+ assertNotNull(reportingTask.getIdentifier());
+ assertNotNull(reportingTask.getInstanceIdentifier());
+ assertNotEquals(originalReportingTaskId,
reportingTask.getIdentifier());
+
+ assertEquals(service.getInstanceIdentifier(),
reportingTask.getProperties().get(serviceDescriptor.getName()));
+ }
}
\ No newline at end of file
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java
index f2880c317f..34b3cdce29 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ControllerClient.java
@@ -24,6 +24,8 @@ import
org.apache.nifi.web.api.entity.FlowRegistryClientsEntity;
import org.apache.nifi.web.api.entity.NodeEntity;
import org.apache.nifi.web.api.entity.ParameterProviderEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import
org.apache.nifi.web.api.entity.VersionedReportingTaskImportRequestEntity;
+import
org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity;
import java.io.IOException;
@@ -56,6 +58,9 @@ public interface ControllerClient {
ReportingTaskEntity createReportingTask(ReportingTaskEntity reportingTask)
throws NiFiClientException, IOException;
+ VersionedReportingTaskImportResponseEntity
importReportingTasks(VersionedReportingTaskImportRequestEntity
importRequestEntity)
+ throws NiFiClientException, IOException;
+
ParameterProviderEntity createParamProvider(ParameterProviderEntity
paramProvider) throws NiFiClientException, IOException;
ControllerConfigurationEntity getControllerConfiguration() throws
NiFiClientException, IOException;
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java
index b48381cbfd..fe15f9a2a5 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyControllerClient.java
@@ -28,6 +28,8 @@ import
org.apache.nifi.web.api.entity.FlowRegistryClientsEntity;
import org.apache.nifi.web.api.entity.NodeEntity;
import org.apache.nifi.web.api.entity.ParameterProviderEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import
org.apache.nifi.web.api.entity.VersionedReportingTaskImportRequestEntity;
+import
org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
@@ -227,6 +229,23 @@ public class JerseyControllerClient extends
AbstractJerseyClient implements Cont
});
}
+ @Override
+ public VersionedReportingTaskImportResponseEntity
importReportingTasks(VersionedReportingTaskImportRequestEntity
importRequestEntity)
+ throws NiFiClientException, IOException {
+ if (importRequestEntity == null) {
+ throw new IllegalArgumentException("Import request entity cannot
be null");
+ }
+
+ return executeAction("Error creating reporting task", () -> {
+ final WebTarget target =
controllerTarget.path("reporting-tasks/import");
+
+ return getRequestBuilder(target).post(
+ Entity.entity(importRequestEntity,
MediaType.APPLICATION_JSON),
+ VersionedReportingTaskImportResponseEntity.class
+ );
+ });
+ }
+
@Override
public ParameterProviderEntity createParamProvider(final
ParameterProviderEntity paramProvider) throws NiFiClientException, IOException {
if (paramProvider == null) {
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
index 29feb8073d..c3df04d797 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
@@ -35,6 +35,7 @@ import
org.apache.nifi.toolkit.cli.impl.command.nifi.flow.GetControllerConfigura
import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.GetReportingTask;
import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.GetReportingTasks;
import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.GetRootId;
+import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.ImportReportingTasks;
import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.StartReportingTasks;
import org.apache.nifi.toolkit.cli.impl.command.nifi.flow.StopReportingTasks;
import
org.apache.nifi.toolkit.cli.impl.command.nifi.flow.UpdateControllerConfiguration;
@@ -154,6 +155,7 @@ public class NiFiCommandGroup extends AbstractCommandGroup {
commands.add(new StopReportingTasks());
commands.add(new ExportReportingTasks());
commands.add(new ExportReportingTask());
+ commands.add(new ImportReportingTasks());
commands.add(new ListUsers());
commands.add(new CreateUser());
commands.add(new ListUserGroups());
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/flow/ImportReportingTasks.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/flow/ImportReportingTasks.java
new file mode 100644
index 0000000000..a83536a46f
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/flow/ImportReportingTasks.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.nifi.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.nifi.flow.VersionedReportingTaskSnapshot;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand;
+import org.apache.nifi.toolkit.cli.impl.result.nifi.ImportReportingTasksResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+import
org.apache.nifi.web.api.entity.VersionedReportingTaskImportRequestEntity;
+import
org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class ImportReportingTasks extends
AbstractNiFiCommand<ImportReportingTasksResult> {
+
+ public ImportReportingTasks() {
+ super("import-reporting-tasks", ImportReportingTasksResult.class);
+ }
+
+ @Override
+ public void doInitialize(final Context context) {
+ addOption(CommandOption.INPUT_SOURCE.createOption());
+ }
+
+ @Override
+ public String getDescription() {
+ return "Imports the contents of a reporting task snapshot produced
from export-reporting-tasks or export-reporting-task.";
+ }
+
+ @Override
+ public ImportReportingTasksResult doExecute(final NiFiClient client, final
Properties properties) throws NiFiClientException, IOException,
MissingOptionException, CommandException {
+ final String inputFile = getRequiredArg(properties,
CommandOption.INPUT_SOURCE);
+ final String contents = getInputSourceContent(inputFile);
+
+ final ObjectMapper objectMapper = JacksonUtils.getObjectMapper();
+ final VersionedReportingTaskSnapshot reportingTaskSnapshot =
objectMapper.readValue(contents, VersionedReportingTaskSnapshot.class);
+ if (reportingTaskSnapshot == null) {
+ throw new IOException("Unable to deserialize reporting task
snapshot from " + inputFile);
+ }
+
+ final VersionedReportingTaskImportRequestEntity importRequestEntity =
new VersionedReportingTaskImportRequestEntity();
+ importRequestEntity.setReportingTaskSnapshot(reportingTaskSnapshot);
+
+ final VersionedReportingTaskImportResponseEntity importResponseEntity
= client.getControllerClient().importReportingTasks(importRequestEntity);
+ return new ImportReportingTasksResult(getResultType(properties),
importResponseEntity);
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ImportReportingTasksResult.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ImportReportingTasksResult.java
new file mode 100644
index 0000000000..183d1d19f5
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ImportReportingTasksResult.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.result.nifi;
+
+import org.apache.nifi.toolkit.cli.api.ResultType;
+import org.apache.nifi.toolkit.cli.impl.result.AbstractWritableResult;
+import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter;
+import org.apache.nifi.toolkit.cli.impl.result.writer.Table;
+import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ReportingTaskEntity;
+import
org.apache.nifi.web.api.entity.VersionedReportingTaskImportResponseEntity;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ImportReportingTasksResult extends
AbstractWritableResult<VersionedReportingTaskImportResponseEntity> {
+
+ private static final String REPORTING_TASK_TYPE = "REPORTING_TASK";
+ private static final String CONTROLLER_SERVICE_TYPE = "CONTROLLER_SERVICE";
+
+ private final VersionedReportingTaskImportResponseEntity responseEntity;
+
+ public ImportReportingTasksResult(final ResultType resultType, final
VersionedReportingTaskImportResponseEntity responseEntity) {
+ super(resultType);
+ this.responseEntity = responseEntity;
+ }
+
+ @Override
+ public VersionedReportingTaskImportResponseEntity getResult() {
+ return responseEntity;
+ }
+
+ @Override
+ protected void writeSimpleResult(final PrintStream output) throws
IOException {
+ final Set<ReportingTaskEntity> tasksEntities =
responseEntity.getReportingTasks();
+ final Set<ControllerServiceEntity> serviceEntities =
responseEntity.getControllerServices();
+ if (tasksEntities == null || serviceEntities == null) {
+ return;
+ }
+
+ final List<ReportingTaskDTO> taskDTOS = tasksEntities.stream()
+ .map(ReportingTaskEntity::getComponent)
+ .sorted(Comparator.comparing(ReportingTaskDTO::getName))
+ .collect(Collectors.toList());
+
+ final List<ControllerServiceDTO> serviceDTOS = serviceEntities.stream()
+ .map(ControllerServiceEntity::getComponent)
+ .sorted(Comparator.comparing(ControllerServiceDTO::getName))
+ .collect(Collectors.toList());
+
+ final Table table = new Table.Builder()
+ .column("#", 3, 3, false)
+ .column("Name", 5, 40, true)
+ .column("ID", 36, 36, false)
+ .column("Type", 15, 20, true)
+ .build();
+
+ int componentCount = 0;
+ for (final ReportingTaskDTO taskDTO : taskDTOS) {
+ table.addRow(String.valueOf(++componentCount), taskDTO.getName(),
taskDTO.getId(), REPORTING_TASK_TYPE);
+ }
+ for (final ControllerServiceDTO serviceDTO : serviceDTOS) {
+ table.addRow(String.valueOf(++componentCount),
serviceDTO.getName(), serviceDTO.getId(), CONTROLLER_SERVICE_TYPE);
+ }
+
+ final TableWriter tableWriter = new DynamicTableWriter();
+ tableWriter.write(table, output);
+ }
+}