This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fe6987d7793 NIFI-15347 Fixed property migration for Reporting Tasks
(#10648)
fe6987d7793 is described below
commit fe6987d77935a01e1eb2d9e34e2c249aa031cdf1
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Dec 16 15:25:45 2025 +0100
NIFI-15347 Fixed property migration for Reporting Tasks (#10648)
Signed-off-by: David Handermann <[email protected]>
---
.../serialization/VersionedFlowSynchronizer.java | 25 +++++---
.../VersionedFlowSynchronizerTest.java | 71 ++++++++++++++++++++++
2 files changed, 87 insertions(+), 9 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 925b33397e3..9465ba12037 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -619,18 +619,19 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
final BundleCoordinate coordinate =
createBundleCoordinate(extensionManager, reportingTask.getBundle(),
reportingTask.getType());
final ReportingTaskNode taskNode =
controller.createReportingTask(reportingTask.getType(),
reportingTask.getInstanceIdentifier(), coordinate, false);
- updateReportingTask(taskNode, reportingTask, controller);
+
+ final Map<String, String> decryptedProperties =
decryptProperties(reportingTask.getProperties(), controller.getEncryptor());
+ configureReportingTask(taskNode, reportingTask, decryptedProperties);
final ControllerServiceFactory serviceFactory = new
StandardControllerServiceFactory(controller.getExtensionManager(),
controller.getFlowManager(),
controller.getControllerServiceProvider(), taskNode);
- Map<String, String> rawPropertyValues =
taskNode.getRawPropertyValues().entrySet().stream()
- .collect(HashMap::new,
- (m, e) -> m.put(e.getKey().getName(), e.getValue()),
- HashMap::putAll);
- taskNode.migrateConfiguration(rawPropertyValues, serviceFactory);
+ taskNode.migrateConfiguration(decryptedProperties, serviceFactory);
+
+ // Start reporting task after migration is complete to avoid modifying
running task
+ startReportingTask(taskNode, reportingTask, controller);
}
- private void updateReportingTask(final ReportingTaskNode taskNode, final
VersionedReportingTask reportingTask, final FlowController controller) {
+ private void configureReportingTask(final ReportingTaskNode taskNode,
final VersionedReportingTask reportingTask, final Map<String, String>
decryptedProperties) {
taskNode.setName(reportingTask.getName());
taskNode.setComments(reportingTask.getComments());
taskNode.setSchedulingPeriod(reportingTask.getSchedulingPeriod());
@@ -639,10 +640,10 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
taskNode.setAnnotationData(reportingTask.getAnnotationData());
final Set<String> sensitiveDynamicPropertyNames =
getSensitiveDynamicPropertyNames(taskNode, reportingTask);
- final Map<String, String> decryptedProperties =
decryptProperties(reportingTask.getProperties(), controller.getEncryptor());
taskNode.setProperties(decryptedProperties, false,
sensitiveDynamicPropertyNames);
+ }
- // enable/disable/start according to the ScheduledState
+ private void startReportingTask(final ReportingTaskNode taskNode, final
VersionedReportingTask reportingTask, final FlowController controller) {
switch (reportingTask.getScheduledState()) {
case DISABLED:
if (taskNode.isRunning()) {
@@ -668,6 +669,12 @@ public class VersionedFlowSynchronizer implements
FlowSynchronizer {
}
}
+ private void updateReportingTask(final ReportingTaskNode taskNode, final
VersionedReportingTask reportingTask, final FlowController controller) {
+ final Map<String, String> decryptedProperties =
decryptProperties(reportingTask.getProperties(), controller.getEncryptor());
+ configureReportingTask(taskNode, reportingTask, decryptedProperties);
+ startReportingTask(taskNode, reportingTask, controller);
+ }
+
private void inheritFlowAnalysisRules(final FlowController controller,
final VersionedDataflow dataflow, final AffectedComponentSet
affectedComponentSet)
throws FlowAnalysisRuleInstantiationException {
// Guard state in order to be able to read flow.json from before
adding the flow analysis rules
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java
index 3779c25db3d..b01317c83ec 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.serialization;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.SnippetManager;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.flow.FlowManager;
@@ -30,6 +31,7 @@ import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.groups.BundleUpdateStrategy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
@@ -42,6 +44,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -56,6 +59,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -78,6 +82,10 @@ class VersionedFlowSynchronizerTest {
private static final String DECRYPTED_PROPERTY_VALUE = "decoded";
+ private static final String REPORTING_TASK_INSTANCE_ID =
"reporting-task-instance-id";
+
+ private static final String REPORTING_TASK_TYPE =
"org.apache.nifi.reporting.TestReportingTask";
+
@Mock
private ExtensionManager extensionManager;
@@ -176,6 +184,69 @@ class VersionedFlowSynchronizerTest {
assertEquals(expectedDecryptedProperties, originalProperties);
}
+ @Test
+ void testSyncInheritReportingTasksMigrateConfigurationBeforeStart() {
+ setRootGroup();
+ setFlowController();
+
+ // Create Versioned Reporting Task with RUNNING state to verify
migration happens before start
+ final VersionedReportingTask reportingTask =
getVersionedReportingTask();
+ final List<VersionedReportingTask> reportingTasks =
List.of(reportingTask);
+ when(versionedDataflow.getReportingTasks()).thenReturn(reportingTasks);
+
+ // Return null for initial Reporting Task Node lookup indicating it
needs to be created
+ final ReportingTaskNode reportingTaskNode =
mock(ReportingTaskNode.class);
+
when(flowController.getReportingTaskNode(eq(REPORTING_TASK_INSTANCE_ID))).thenReturn(null);
+
+ // Mock Property Descriptor for sensitive Property with decrypted value
+ final PropertyDescriptor sensitivePropertyDescriptor =
mock(PropertyDescriptor.class);
+
when(reportingTaskNode.getPropertyDescriptor(eq(SENSITIVE_PROPERTY_NAME))).thenReturn(sensitivePropertyDescriptor);
+ when(encryptor.decrypt(any())).thenReturn(DECRYPTED_PROPERTY_VALUE);
+
+ // Return created Reporting Task Node
+ when(flowController.createReportingTask(any(),
eq(REPORTING_TASK_INSTANCE_ID), any(),
eq(false))).thenReturn(reportingTaskNode);
+
+ versionedFlowSynchronizer.sync(flowController, dataFlow, flowService,
BundleUpdateStrategy.USE_SPECIFIED_OR_GHOST);
+
+ // Verify migrateConfiguration is called with decrypted properties
+ final ArgumentCaptor<Map<String, String>> originalPropertiesCaptor =
ArgumentCaptor.captor();
+
verify(reportingTaskNode).migrateConfiguration(originalPropertiesCaptor.capture(),
any());
+
+ final Map<String, String> originalProperties =
originalPropertiesCaptor.getValue();
+ final Map<String, String> expectedDecryptedProperties = Map.of(
+ PROPERTY_NAME, PROPERTY_VALUE,
+ SENSITIVE_PROPERTY_NAME, DECRYPTED_PROPERTY_VALUE
+ );
+ assertEquals(expectedDecryptedProperties, originalProperties);
+
+ // Verify that migrateConfiguration is called BEFORE startReportingTask
+ // This is the key assertion: migration must complete before task is
started
+ final InOrder inOrder = inOrder(reportingTaskNode, flowController);
+ inOrder.verify(reportingTaskNode).migrateConfiguration(any(), any());
+ inOrder.verify(flowController).startReportingTask(reportingTaskNode);
+ }
+
+ private VersionedReportingTask getVersionedReportingTask() {
+ final VersionedReportingTask reportingTask = new
VersionedReportingTask();
+ reportingTask.setInstanceIdentifier(REPORTING_TASK_INSTANCE_ID);
+ reportingTask.setBundle(CORE_BUNDLE);
+ reportingTask.setType(REPORTING_TASK_TYPE);
+ reportingTask.setName("Test Reporting Task");
+ reportingTask.setSchedulingStrategy("TIMER_DRIVEN");
+ reportingTask.setSchedulingPeriod("5 mins");
+
+ final Map<String, String> versionedProperties = Map.of(
+ PROPERTY_NAME, PROPERTY_VALUE,
+ SENSITIVE_PROPERTY_NAME, ENCRYPTED_PROPERTY_VALUE
+ );
+
+ reportingTask.setProperties(versionedProperties);
+ reportingTask.setPropertyDescriptors(Map.of());
+ // Set to RUNNING to verify migration happens before start
+ reportingTask.setScheduledState(ScheduledState.RUNNING);
+ return reportingTask;
+ }
+
private VersionedControllerService getVersionedControllerService() {
final VersionedControllerService controllerService = new
VersionedControllerService();
controllerService.setInstanceIdentifier(SERVICE_INSTANCE_ID);