This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new fe6987d7793 NIFI-15347 Fixed property migration for Reporting Tasks 
(#10648)
fe6987d7793 is described below

commit fe6987d77935a01e1eb2d9e34e2c249aa031cdf1
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Dec 16 15:25:45 2025 +0100

    NIFI-15347 Fixed property migration for Reporting Tasks (#10648)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../serialization/VersionedFlowSynchronizer.java   | 25 +++++---
 .../VersionedFlowSynchronizerTest.java             | 71 ++++++++++++++++++++++
 2 files changed, 87 insertions(+), 9 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
index 925b33397e3..9465ba12037 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
@@ -619,18 +619,19 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
         final BundleCoordinate coordinate = 
createBundleCoordinate(extensionManager, reportingTask.getBundle(), 
reportingTask.getType());
 
         final ReportingTaskNode taskNode = 
controller.createReportingTask(reportingTask.getType(), 
reportingTask.getInstanceIdentifier(), coordinate, false);
-        updateReportingTask(taskNode, reportingTask, controller);
+
+        final Map<String, String> decryptedProperties = 
decryptProperties(reportingTask.getProperties(), controller.getEncryptor());
+        configureReportingTask(taskNode, reportingTask, decryptedProperties);
 
         final ControllerServiceFactory serviceFactory = new 
StandardControllerServiceFactory(controller.getExtensionManager(), 
controller.getFlowManager(),
             controller.getControllerServiceProvider(), taskNode);
-        Map<String, String> rawPropertyValues = 
taskNode.getRawPropertyValues().entrySet().stream()
-                .collect(HashMap::new,
-                        (m, e) -> m.put(e.getKey().getName(), e.getValue()),
-                        HashMap::putAll);
-        taskNode.migrateConfiguration(rawPropertyValues, serviceFactory);
+        taskNode.migrateConfiguration(decryptedProperties, serviceFactory);
+
+        // Start reporting task after migration is complete to avoid modifying 
running task
+        startReportingTask(taskNode, reportingTask, controller);
     }
 
-    private void updateReportingTask(final ReportingTaskNode taskNode, final 
VersionedReportingTask reportingTask, final FlowController controller) {
+    private void configureReportingTask(final ReportingTaskNode taskNode, 
final VersionedReportingTask reportingTask, final Map<String, String> 
decryptedProperties) {
         taskNode.setName(reportingTask.getName());
         taskNode.setComments(reportingTask.getComments());
         taskNode.setSchedulingPeriod(reportingTask.getSchedulingPeriod());
@@ -639,10 +640,10 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
         taskNode.setAnnotationData(reportingTask.getAnnotationData());
 
         final Set<String> sensitiveDynamicPropertyNames = 
getSensitiveDynamicPropertyNames(taskNode, reportingTask);
-        final Map<String, String> decryptedProperties = 
decryptProperties(reportingTask.getProperties(), controller.getEncryptor());
         taskNode.setProperties(decryptedProperties, false, 
sensitiveDynamicPropertyNames);
+    }
 
-        // enable/disable/start according to the ScheduledState
+    private void startReportingTask(final ReportingTaskNode taskNode, final 
VersionedReportingTask reportingTask, final FlowController controller) {
         switch (reportingTask.getScheduledState()) {
             case DISABLED:
                 if (taskNode.isRunning()) {
@@ -668,6 +669,12 @@ public class VersionedFlowSynchronizer implements 
FlowSynchronizer {
         }
     }
 
+    private void updateReportingTask(final ReportingTaskNode taskNode, final 
VersionedReportingTask reportingTask, final FlowController controller) {
+        final Map<String, String> decryptedProperties = 
decryptProperties(reportingTask.getProperties(), controller.getEncryptor());
+        configureReportingTask(taskNode, reportingTask, decryptedProperties);
+        startReportingTask(taskNode, reportingTask, controller);
+    }
+
     private void inheritFlowAnalysisRules(final FlowController controller, 
final VersionedDataflow dataflow, final AffectedComponentSet 
affectedComponentSet)
         throws FlowAnalysisRuleInstantiationException {
         // Guard state in order to be able to read flow.json from before 
adding the flow analysis rules
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java
index 3779c25db3d..b01317c83ec 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizerTest.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.serialization;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.SnippetManager;
 import org.apache.nifi.controller.UninheritableFlowException;
 import org.apache.nifi.controller.flow.FlowManager;
@@ -30,6 +31,7 @@ import org.apache.nifi.flow.Bundle;
 import org.apache.nifi.flow.ScheduledState;
 import org.apache.nifi.flow.VersionedControllerService;
 import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedReportingTask;
 import org.apache.nifi.groups.BundleUpdateStrategy;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.nar.ExtensionManager;
@@ -42,6 +44,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
@@ -56,6 +59,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -78,6 +82,10 @@ class VersionedFlowSynchronizerTest {
 
     private static final String DECRYPTED_PROPERTY_VALUE = "decoded";
 
+    private static final String REPORTING_TASK_INSTANCE_ID = 
"reporting-task-instance-id";
+
+    private static final String REPORTING_TASK_TYPE = 
"org.apache.nifi.reporting.TestReportingTask";
+
     @Mock
     private ExtensionManager extensionManager;
 
@@ -176,6 +184,69 @@ class VersionedFlowSynchronizerTest {
         assertEquals(expectedDecryptedProperties, originalProperties);
     }
 
+    @Test
+    void testSyncInheritReportingTasksMigrateConfigurationBeforeStart() {
+        setRootGroup();
+        setFlowController();
+
+        // Create Versioned Reporting Task with RUNNING state to verify 
migration happens before start
+        final VersionedReportingTask reportingTask = 
getVersionedReportingTask();
+        final List<VersionedReportingTask> reportingTasks = 
List.of(reportingTask);
+        when(versionedDataflow.getReportingTasks()).thenReturn(reportingTasks);
+
+        // Return null for initial Reporting Task Node lookup indicating it 
needs to be created
+        final ReportingTaskNode reportingTaskNode = 
mock(ReportingTaskNode.class);
+        
when(flowController.getReportingTaskNode(eq(REPORTING_TASK_INSTANCE_ID))).thenReturn(null);
+
+        // Mock Property Descriptor for sensitive Property with decrypted value
+        final PropertyDescriptor sensitivePropertyDescriptor = 
mock(PropertyDescriptor.class);
+        
when(reportingTaskNode.getPropertyDescriptor(eq(SENSITIVE_PROPERTY_NAME))).thenReturn(sensitivePropertyDescriptor);
+        when(encryptor.decrypt(any())).thenReturn(DECRYPTED_PROPERTY_VALUE);
+
+        // Return created Reporting Task Node
+        when(flowController.createReportingTask(any(), 
eq(REPORTING_TASK_INSTANCE_ID), any(), 
eq(false))).thenReturn(reportingTaskNode);
+
+        versionedFlowSynchronizer.sync(flowController, dataFlow, flowService, 
BundleUpdateStrategy.USE_SPECIFIED_OR_GHOST);
+
+        // Verify migrateConfiguration is called with decrypted properties
+        final ArgumentCaptor<Map<String, String>> originalPropertiesCaptor = 
ArgumentCaptor.captor();
+        
verify(reportingTaskNode).migrateConfiguration(originalPropertiesCaptor.capture(),
 any());
+
+        final Map<String, String> originalProperties = 
originalPropertiesCaptor.getValue();
+        final Map<String, String> expectedDecryptedProperties = Map.of(
+                PROPERTY_NAME, PROPERTY_VALUE,
+                SENSITIVE_PROPERTY_NAME, DECRYPTED_PROPERTY_VALUE
+        );
+        assertEquals(expectedDecryptedProperties, originalProperties);
+
+        // Verify that migrateConfiguration is called BEFORE startReportingTask
+        // This is the key assertion: migration must complete before task is 
started
+        final InOrder inOrder = inOrder(reportingTaskNode, flowController);
+        inOrder.verify(reportingTaskNode).migrateConfiguration(any(), any());
+        inOrder.verify(flowController).startReportingTask(reportingTaskNode);
+    }
+
+    private VersionedReportingTask getVersionedReportingTask() {
+        final VersionedReportingTask reportingTask = new 
VersionedReportingTask();
+        reportingTask.setInstanceIdentifier(REPORTING_TASK_INSTANCE_ID);
+        reportingTask.setBundle(CORE_BUNDLE);
+        reportingTask.setType(REPORTING_TASK_TYPE);
+        reportingTask.setName("Test Reporting Task");
+        reportingTask.setSchedulingStrategy("TIMER_DRIVEN");
+        reportingTask.setSchedulingPeriod("5 mins");
+
+        final Map<String, String> versionedProperties = Map.of(
+                PROPERTY_NAME, PROPERTY_VALUE,
+                SENSITIVE_PROPERTY_NAME, ENCRYPTED_PROPERTY_VALUE
+        );
+
+        reportingTask.setProperties(versionedProperties);
+        reportingTask.setPropertyDescriptors(Map.of());
+        // Set to RUNNING to verify migration happens before start
+        reportingTask.setScheduledState(ScheduledState.RUNNING);
+        return reportingTask;
+    }
+
     private VersionedControllerService getVersionedControllerService() {
         final VersionedControllerService controllerService = new 
VersionedControllerService();
         controllerService.setInstanceIdentifier(SERVICE_INSTANCE_ID);

Reply via email to