This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c757de20f1b NIFI-15749 - Start newly added components when upgrading 
versioned process group (#11061)
c757de20f1b is described below

commit c757de20f1b5ec39dc9c8e3b5913f9539badd1af
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Mar 27 20:16:56 2026 +0100

    NIFI-15749 - Start newly added components when upgrading versioned process 
group (#11061)
    
    * NIFI-15749 - Start newly added components when upgrading versioned 
process group
    * active state based on processors only
---
 .../RetainExistingStateComponentScheduler.java     |  68 ++++-
 .../RetainExistingStateComponentSchedulerTest.java | 304 +++++++++++++++++++++
 .../tests/system/registry/RegistryClientIT.java    | 186 +++++++++++++
 3 files changed, 551 insertions(+), 7 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
index 913c586356e..393b9866bdf 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.groups;
 
 import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceNode;
@@ -33,24 +34,46 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+/**
+ * A {@link ComponentScheduler} that retains the existing state of components 
during a flow update. For components that already existed
+ * before the update, the scheduler only starts or enables them if they were 
previously running or enabled. Components that were stopped
+ * or disabled remain in that state.
+ *
+ * <p>For components that are newly added by the update (i.e., not present 
before the update), the scheduler determines whether to start
+ * or enable them based on whether the process group is currently "active". 
The process group is considered active if at least one processor
+ * is running at the time this scheduler is constructed. During a version 
upgrade, the REST layer stops only the affected (modified)
+ * processors before delegating to the synchronizer, so unaffected processors 
remain in their original state and serve as the signal
+ * for whether the group is active.</p>
+ *
+ * <p>When the process group is active, newly added processors are started and 
newly added controller services are enabled. When the
+ * process group is not active (e.g., fully stopped before the upgrade, or an 
initial import into an empty group), newly added
+ * components remain stopped or disabled.</p>
+ */
 public class RetainExistingStateComponentScheduler implements 
ComponentScheduler {
     private static final Logger logger = 
LoggerFactory.getLogger(RetainExistingStateComponentScheduler.class);
 
     private final ComponentScheduler delegate;
     private final Map<String, ScheduledState> componentStates;
     private final Map<String, ControllerServiceState> controllerServiceStates;
+    private final boolean processGroupActive;
 
     public RetainExistingStateComponentScheduler(final ProcessGroup 
processGroup, final ComponentScheduler delegate) {
         this.delegate = delegate;
         this.componentStates = mapComponentStates(processGroup);
         this.controllerServiceStates = 
mapControllerServiceStates(processGroup);
+        this.processGroupActive = hasActiveRuntimeState(processGroup);
     }
 
     @Override
     public void startComponent(final Connectable component) {
         final ScheduledState existingState = 
componentStates.get(component.getIdentifier());
         if (existingState == null) {
-            logger.debug("Will not start {} because it was not previously 
known in this Process Group", component);
+            if (processGroupActive) {
+                logger.debug("Starting new component {} because the Process 
Group is active", component);
+                delegate.startComponent(component);
+            } else {
+                logger.debug("Will not start {} because it was not previously 
known in this Process Group and the Process Group is not active", component);
+            }
             return;
         }
 
@@ -67,7 +90,12 @@ public class RetainExistingStateComponentScheduler 
implements ComponentScheduler
     public void startStatelessGroup(final ProcessGroup group) {
         final ScheduledState existingState = 
componentStates.get(group.getIdentifier());
         if (existingState == null) {
-            logger.debug("Will not start {} because it was not previously 
known in this Process Group", group);
+            if (processGroupActive) {
+                logger.debug("Starting new stateless group {} because the 
Process Group is active", group);
+                delegate.startStatelessGroup(group);
+            } else {
+                logger.debug("Will not start {} because it was not previously 
known in this Process Group and the Process Group is not active", group);
+            }
             return;
         }
 
@@ -92,6 +120,12 @@ public class RetainExistingStateComponentScheduler 
implements ComponentScheduler
 
     @Override
     public void transitionComponentState(final Connectable component, final 
org.apache.nifi.flow.ScheduledState desiredState) {
+        final ScheduledState existingState = 
componentStates.get(component.getIdentifier());
+        if (existingState == null && processGroupActive && desiredState != 
org.apache.nifi.flow.ScheduledState.DISABLED) {
+            logger.debug("Starting new component {} because the Process Group 
is active and desired state is {}", component, desiredState);
+            delegate.startComponent(component);
+            return;
+        }
         delegate.transitionComponentState(component, desiredState);
     }
 
@@ -103,12 +137,17 @@ public class RetainExistingStateComponentScheduler 
implements ComponentScheduler
             final ControllerServiceState existingState = 
controllerServiceStates.get(service.getIdentifier());
 
             if (existingState == null) {
-                logger.debug("Will not enable {} because it was not previously 
known in this Process Group", service);
+                if (processGroupActive) {
+                    logger.debug("Enabling new service {} because the Process 
Group is active", service);
+                    toEnable.add(service);
+                } else {
+                    logger.debug("Will not enable {} because it was not 
previously known in this Process Group and the Process Group is not active", 
service);
+                }
                 continue;
             }
 
             if (existingState != ControllerServiceState.ENABLED && 
existingState != ControllerServiceState.ENABLING) {
-                logger.debug("Will not enable {} because its previously state 
was {}", service, existingState);
+                logger.debug("Will not enable {} because its previous state 
was {}", service, existingState);
                 continue;
             }
 
@@ -139,6 +178,10 @@ public class RetainExistingStateComponentScheduler 
implements ComponentScheduler
         delegate.resume();
     }
 
+    boolean isProcessGroupActive() {
+        return processGroupActive;
+    }
+
     private Map<String, ControllerServiceState> 
mapControllerServiceStates(final ProcessGroup group) {
         final Set<ControllerServiceNode> services = 
group.findAllControllerServices();
         final Map<String, ControllerServiceState> serviceStates = 
services.stream()
@@ -167,7 +210,18 @@ public class RetainExistingStateComponentScheduler 
implements ComponentScheduler
         return componentStates;
     }
 
-    private void findAllConnectables(final ProcessGroup group, final 
Set<Connectable> connectables) {
+    static boolean hasActiveRuntimeState(final ProcessGroup group) {
+        for (final ProcessorNode processor : group.findAllProcessors()) {
+            final ScheduledState state = processor.getScheduledState();
+            if (state == ScheduledState.RUNNING || state == 
ScheduledState.STARTING) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static void findAllConnectables(final ProcessGroup group, final 
Set<Connectable> connectables) {
         connectables.addAll(group.getInputPorts());
         connectables.addAll(group.getOutputPorts());
         connectables.addAll(group.getFunnels());
@@ -182,10 +236,10 @@ public class RetainExistingStateComponentScheduler 
implements ComponentScheduler
         }
     }
 
-    private void findAllStatelessGroups(final ProcessGroup start, final 
Set<ProcessGroup> statelessGroups) {
+    private static void findAllStatelessGroups(final ProcessGroup start, final 
Set<ProcessGroup> statelessGroups) {
         if (start.resolveExecutionEngine() == ExecutionEngine.STATELESS) {
             statelessGroups.add(start);
-            return; // No need to go further, as the top-level stateless group 
is all we need.
+            return;
         }
 
         for (final ProcessGroup childGroup : start.getProcessGroups()) {
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/RetainExistingStateComponentSchedulerTest.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/RetainExistingStateComponentSchedulerTest.java
new file mode 100644
index 00000000000..8f5add62a79
--- /dev/null
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/RetainExistingStateComponentSchedulerTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.groups;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.flow.ExecutionEngine;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class RetainExistingStateComponentSchedulerTest {
+
+    private ComponentScheduler delegate;
+
+    @BeforeEach
+    void setup() {
+        delegate = mock(ComponentScheduler.class);
+    }
+
+    @Test
+    void testEmptyProcessGroupIsNotActive() {
+        final ProcessGroup group = createProcessGroup(Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+        assertFalse(scheduler.isProcessGroupActive());
+        
assertFalse(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+    }
+
+    @Test
+    void testProcessGroupWithRunningProcessorIsActive() {
+        final ProcessorNode processor = createMockProcessor("proc-1", 
ScheduledState.RUNNING);
+        final ProcessGroup group = createProcessGroup(Set.of(processor), 
Collections.emptySet(), Collections.emptySet());
+        
assertTrue(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+        assertTrue(new RetainExistingStateComponentScheduler(group, 
delegate).isProcessGroupActive());
+    }
+
+    @Test
+    void testProcessGroupWithStartingProcessorIsActive() {
+        final ProcessorNode processor = createMockProcessor("proc-1", 
ScheduledState.STARTING);
+        final ProcessGroup group = createProcessGroup(Set.of(processor), 
Collections.emptySet(), Collections.emptySet());
+        
assertTrue(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+    }
+
+    @Test
+    void testProcessGroupWithEnabledServiceOnlyIsNotActive() {
+        final ControllerServiceNode service = createMockService("svc-1", 
ControllerServiceState.ENABLED);
+        final ProcessGroup group = createProcessGroup(Collections.emptySet(), 
Collections.emptySet(), Set.of(service));
+        
assertFalse(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+        assertFalse(new RetainExistingStateComponentScheduler(group, 
delegate).isProcessGroupActive());
+    }
+
+    @Test
+    void testProcessGroupWithEnablingServiceOnlyIsNotActive() {
+        final ControllerServiceNode service = createMockService("svc-1", 
ControllerServiceState.ENABLING);
+        final ProcessGroup group = createProcessGroup(Collections.emptySet(), 
Collections.emptySet(), Set.of(service));
+        
assertFalse(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+    }
+
+    @Test
+    void testProcessGroupWithStoppedComponentsIsNotActive() {
+        final ProcessorNode processor = createMockProcessor("proc-1", 
ScheduledState.STOPPED);
+        final ControllerServiceNode service = createMockService("svc-1", 
ControllerServiceState.DISABLED);
+        final ProcessGroup group = createProcessGroup(Set.of(processor), 
Collections.emptySet(), Set.of(service));
+        
assertFalse(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+        assertFalse(new RetainExistingStateComponentScheduler(group, 
delegate).isProcessGroupActive());
+    }
+
+    @Test
+    void testProcessGroupWithRunningPortIsNotActive() {
+        final Port runningPort = mock(Port.class);
+        when(runningPort.getIdentifier()).thenReturn("port-1");
+        
when(runningPort.getScheduledState()).thenReturn(ScheduledState.RUNNING);
+        final ProcessGroup group = createProcessGroup(Collections.emptySet(), 
Set.of(runningPort), Collections.emptySet());
+        
assertFalse(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+    }
+
+    @Test
+    void testNewComponentStartedWhenProcessGroupActive() {
+        final ProcessorNode runningProcessor = 
createMockProcessor("running-proc", ScheduledState.RUNNING);
+        final ProcessGroup group = 
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(), 
Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+        assertTrue(scheduler.isProcessGroupActive());
+
+        final Connectable newComponent = mock(Connectable.class);
+        when(newComponent.getIdentifier()).thenReturn("new-component-id");
+
+        scheduler.startComponent(newComponent);
+        verify(delegate).startComponent(newComponent);
+    }
+
+    @Test
+    void testNewComponentNotStartedWhenProcessGroupInactive() {
+        final ProcessGroup group = createProcessGroup(Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+        assertFalse(scheduler.isProcessGroupActive());
+
+        final Connectable newComponent = mock(Connectable.class);
+        when(newComponent.getIdentifier()).thenReturn("new-component-id");
+
+        scheduler.startComponent(newComponent);
+        verify(delegate, never()).startComponent(newComponent);
+    }
+
+    @Test
+    void testExistingRunningComponentStarted() {
+        final ProcessorNode runningProcessor = 
createMockProcessor("running-proc", ScheduledState.RUNNING);
+        final ProcessGroup group = 
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(), 
Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+
+        scheduler.startComponent(runningProcessor);
+        verify(delegate).startComponent(runningProcessor);
+    }
+
+    @Test
+    void testExistingStoppedComponentNotStarted() {
+        final ProcessorNode stoppedProcessor = 
createMockProcessor("stopped-proc", ScheduledState.STOPPED);
+        final ProcessorNode runningProcessor = 
createMockProcessor("running-proc", ScheduledState.RUNNING);
+        final ProcessGroup group = createProcessGroup(Set.of(stoppedProcessor, 
runningProcessor), Collections.emptySet(), Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+
+        scheduler.startComponent(stoppedProcessor);
+        verify(delegate, never()).startComponent(stoppedProcessor);
+    }
+
+    @Test
+    void testNewServiceEnabledWhenProcessGroupActive() {
+        final ProcessorNode runningProcessor = 
createMockProcessor("running-proc", ScheduledState.RUNNING);
+        final ControllerServiceNode existingService = 
createMockService("existing-svc", ControllerServiceState.ENABLED);
+        final ProcessGroup group = 
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(), 
Set.of(existingService));
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+        assertTrue(scheduler.isProcessGroupActive());
+
+        final ControllerServiceNode newService = createMockService("new-svc", 
ControllerServiceState.DISABLED);
+        scheduler.enableControllerServicesAsync(List.of(newService));
+        verify(delegate).enableControllerServicesAsync(Set.of(newService));
+    }
+
+    @Test
+    void testNewServiceNotEnabledWhenProcessGroupInactive() {
+        final ProcessGroup group = createProcessGroup(Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+
+        final ControllerServiceNode newService = createMockService("new-svc", 
ControllerServiceState.DISABLED);
+        scheduler.enableControllerServicesAsync(List.of(newService));
+        verify(delegate).enableControllerServicesAsync(Collections.emptySet());
+    }
+
+    @Test
+    void testExistingEnabledServiceReEnabled() {
+        final ControllerServiceNode enabledService = 
createMockService("enabled-svc", ControllerServiceState.ENABLED);
+        final ProcessGroup group = createProcessGroup(Collections.emptySet(), 
Collections.emptySet(), Set.of(enabledService));
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+
+        scheduler.enableControllerServicesAsync(List.of(enabledService));
+        verify(delegate).enableControllerServicesAsync(Set.of(enabledService));
+    }
+
+    @Test
+    void testExistingDisabledServiceNotReEnabled() {
+        final ControllerServiceNode disabledService = 
createMockService("disabled-svc", ControllerServiceState.DISABLED);
+        final ProcessorNode runningProcessor = 
createMockProcessor("running-proc", ScheduledState.RUNNING);
+        final ProcessGroup group = 
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(), 
Set.of(disabledService));
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+
+        scheduler.enableControllerServicesAsync(List.of(disabledService));
+        verify(delegate).enableControllerServicesAsync(Collections.emptySet());
+    }
+
+    @Test
+    void testNewStatelessGroupStartedWhenProcessGroupActive() {
+        final ProcessorNode runningProcessor = 
createMockProcessor("running-proc", ScheduledState.RUNNING);
+        final ProcessGroup parentGroup = 
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(), 
Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(parentGroup, delegate);
+        assertTrue(scheduler.isProcessGroupActive());
+
+        final ProcessGroup newStatelessGroup = mock(ProcessGroup.class);
+        
when(newStatelessGroup.getIdentifier()).thenReturn("new-stateless-group");
+
+        scheduler.startStatelessGroup(newStatelessGroup);
+        verify(delegate).startStatelessGroup(newStatelessGroup);
+    }
+
+    @Test
+    void testNewStatelessGroupNotStartedWhenProcessGroupInactive() {
+        final ProcessGroup parentGroup = 
createProcessGroup(Collections.emptySet(), Collections.emptySet(), 
Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(parentGroup, delegate);
+
+        final ProcessGroup newStatelessGroup = mock(ProcessGroup.class);
+        
when(newStatelessGroup.getIdentifier()).thenReturn("new-stateless-group");
+
+        scheduler.startStatelessGroup(newStatelessGroup);
+        verify(delegate, never()).startStatelessGroup(newStatelessGroup);
+    }
+
+    @Test
+    void testTransitionComponentStateStartsNewComponentWhenActive() {
+        final ProcessorNode runningProcessor = 
createMockProcessor("running-proc", ScheduledState.RUNNING);
+        final ProcessGroup group = 
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(), 
Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+
+        final Connectable newComponent = mock(Connectable.class);
+        when(newComponent.getIdentifier()).thenReturn("new-component-id");
+
+        scheduler.transitionComponentState(newComponent, 
org.apache.nifi.flow.ScheduledState.ENABLED);
+        verify(delegate).startComponent(newComponent);
+        verify(delegate, never()).transitionComponentState(newComponent, 
org.apache.nifi.flow.ScheduledState.ENABLED);
+    }
+
+    @Test
+    void testTransitionComponentStateDoesNotStartNewDisabledComponent() {
+        final ProcessorNode runningProcessor = 
createMockProcessor("running-proc", ScheduledState.RUNNING);
+        final ProcessGroup group = 
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(), 
Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+
+        final Connectable newComponent = mock(Connectable.class);
+        when(newComponent.getIdentifier()).thenReturn("new-component-id");
+
+        scheduler.transitionComponentState(newComponent, 
org.apache.nifi.flow.ScheduledState.DISABLED);
+        verify(delegate, never()).startComponent(newComponent);
+        verify(delegate).transitionComponentState(newComponent, 
org.apache.nifi.flow.ScheduledState.DISABLED);
+    }
+
+    @Test
+    void testTransitionComponentStateDoesNotStartNewComponentWhenInactive() {
+        final ProcessGroup group = createProcessGroup(Collections.emptySet(), 
Collections.emptySet(), Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+
+        final Connectable newComponent = mock(Connectable.class);
+        when(newComponent.getIdentifier()).thenReturn("new-component-id");
+
+        scheduler.transitionComponentState(newComponent, 
org.apache.nifi.flow.ScheduledState.ENABLED);
+        verify(delegate, never()).startComponent(newComponent);
+        verify(delegate).transitionComponentState(newComponent, 
org.apache.nifi.flow.ScheduledState.ENABLED);
+    }
+
+    @Test
+    void testTransitionComponentStateDelegatesForExistingComponents() {
+        final ProcessorNode runningProcessor = 
createMockProcessor("running-proc", ScheduledState.RUNNING);
+        final ProcessGroup group = 
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(), 
Collections.emptySet());
+        final RetainExistingStateComponentScheduler scheduler = new 
RetainExistingStateComponentScheduler(group, delegate);
+
+        scheduler.transitionComponentState(runningProcessor, 
org.apache.nifi.flow.ScheduledState.ENABLED);
+        verify(delegate, never()).startComponent(runningProcessor);
+        verify(delegate).transitionComponentState(runningProcessor, 
org.apache.nifi.flow.ScheduledState.ENABLED);
+    }
+
+    private ProcessorNode createMockProcessor(final String identifier, final 
ScheduledState state) {
+        final ProcessorNode processor = mock(ProcessorNode.class);
+        when(processor.getIdentifier()).thenReturn(identifier);
+        when(processor.getScheduledState()).thenReturn(state);
+        return processor;
+    }
+
+    private ControllerServiceNode createMockService(final String identifier, 
final ControllerServiceState state) {
+        final ControllerServiceNode service = 
mock(ControllerServiceNode.class);
+        when(service.getIdentifier()).thenReturn(identifier);
+        when(service.getState()).thenReturn(state);
+        return service;
+    }
+
+    private ProcessGroup createProcessGroup(final Set<ProcessorNode> 
processors, final Set<Port> inputPorts, final Set<ControllerServiceNode> 
services) {
+        final ProcessGroup group = Mockito.mock(ProcessGroup.class);
+        when(group.getProcessors()).thenReturn(processors);
+        when(group.findAllProcessors()).thenReturn(new 
java.util.ArrayList<>(processors));
+        when(group.getInputPorts()).thenReturn(inputPorts);
+        when(group.getOutputPorts()).thenReturn(Collections.emptySet());
+        when(group.getFunnels()).thenReturn(Collections.emptySet());
+        
when(group.getRemoteProcessGroups()).thenReturn(Collections.emptySet());
+        when(group.getProcessGroups()).thenReturn(Collections.emptySet());
+        when(group.findAllControllerServices()).thenReturn(new 
HashSet<>(services));
+        
when(group.resolveExecutionEngine()).thenReturn(ExecutionEngine.STANDARD);
+        return group;
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
index c3f6b9ec73d..d9876ceb725 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
@@ -364,6 +364,192 @@ public class RegistryClientIT extends NiFiSystemIT {
         assertEquals("UP_TO_DATE", versionedFlowState);
     }
 
+    /**
+     * Tests that when upgrading an active versioned process group, newly 
added components are
+     * automatically started/enabled.
+     *
+     * A standalone "Heartbeat" GenerateFlowFile processor with 
auto-terminated relationships is used as a sentinel
+     * that remains running throughout the upgrade. Because it has no 
connection changes, property changes, or service
+     * references, it is not in the "affected" set and the REST layer does not 
stop it. This guarantees the process
+     * group still has an active component when the synchronizer inspects the 
group, triggering auto-start of new
+     * components.
+     *
+     * v1: GenerateFlowFile -> CountFlowFiles (uses CountService) -> 
TerminateFlowFile, plus a standalone Heartbeat.
+     *
+     * v2: Same as v1 plus:
+     *     - A new StandardCountService ("NewCountService") chained to the 
existing service
+     *     - A new SetAttribute processor connected to TerminateFlowFile
+     *     - A new CountFlowFiles processor referencing NewCountService 
connected to TerminateFlowFile
+     *     - GenerateFlowFile text property changed to trigger restart
+     */
+    @Test
+    public void testNewComponentsStartedDuringVersionChange() throws 
NiFiClientException, IOException, InterruptedException {
+        final FlowRegistryClientEntity clientEntity = registerClient();
+        final NiFiClientUtil util = getClientUtil();
+
+        final ProcessGroupEntity group = util.createProcessGroup("Parent", 
"root");
+        final ControllerServiceEntity countService = 
util.createControllerService("StandardCountService", group.getId());
+
+        final ProcessorEntity generate = 
util.createProcessor("GenerateFlowFile", group.getId());
+        final ProcessorEntity countProcessor = 
util.createProcessor("CountFlowFiles", group.getId());
+        util.updateProcessorProperties(countProcessor, 
Collections.singletonMap("Count Service", countService.getComponent().getId()));
+
+        final ProcessorEntity terminate = 
util.createProcessor("TerminateFlowFile", group.getId());
+        final ConnectionEntity connectionToTerminate = 
util.createConnection(countProcessor, terminate, "success");
+        util.setFifoPrioritizer(connectionToTerminate);
+        util.createConnection(generate, countProcessor, "success");
+
+        // Standalone "Heartbeat" processor: present and unchanged in both v1 
and v2, stays running during upgrade
+        final ProcessorEntity heartbeat = 
util.createProcessor("GenerateFlowFile", group.getId());
+        util.setAutoTerminatedRelationships(heartbeat, "success");
+
+        // Save as v1
+        final VersionControlInformationEntity vci = 
util.startVersionControl(group, clientEntity, TEST_FLOWS_BUCKET, "Parent");
+
+        // Build v2 additions
+
+        // New service chained to the existing service via "Dependent Service"
+        final ControllerServiceEntity newCountService = 
util.createControllerService("StandardCountService", group.getId());
+        util.updateControllerServiceProperties(countService, 
Collections.singletonMap("Dependent Service", 
newCountService.getComponent().getId()));
+
+        // New processor with no service dependency, connected to 
TerminateFlowFile
+        final ProcessorEntity setAttribute = 
util.createProcessor("SetAttribute", group.getId());
+        util.updateProcessorProperties(setAttribute, 
Collections.singletonMap("marker", "v2"));
+        util.createConnection(setAttribute, terminate, "success");
+        util.createConnection(generate, setAttribute, "success");
+
+        // New processor referencing the new service, connected to 
TerminateFlowFile
+        final ProcessorEntity newCountProcessor = 
util.createProcessor("CountFlowFiles", group.getId());
+        util.updateProcessorProperties(newCountProcessor, 
Collections.singletonMap("Count Service", 
newCountService.getComponent().getId()));
+        final ConnectionEntity newCountToTerminate = 
util.createConnection(newCountProcessor, terminate, "success");
+        util.setFifoPrioritizer(newCountToTerminate);
+        util.createConnection(generate, newCountProcessor, "success");
+
+        // Change GenerateFlowFile property so it gets restarted during the 
version change
+        util.updateProcessorProperties(generate, 
Collections.singletonMap("Text", "Hello World"));
+
+        // Save as v2
+        util.saveFlowVersion(group, clientEntity, vci);
+
+        // Switch back to v1 and start the flow
+        util.changeFlowVersion(group.getId(), "1");
+        util.assertFlowStaleAndUnmodified(group.getId());
+
+        util.enableControllerService(countService);
+        util.waitForValidProcessor(generate.getId());
+        util.startProcessor(generate);
+        util.waitForValidProcessor(countProcessor.getId());
+        util.startProcessor(countProcessor);
+        util.waitForValidProcessor(heartbeat.getId());
+        util.startProcessor(heartbeat);
+
+        // Verify v1 flow works
+        waitForQueueCount(connectionToTerminate.getId(), getNumberOfNodes());
+        final Map<String, String> v1Attributes = 
util.getQueueFlowFile(connectionToTerminate.getId(), 
0).getFlowFile().getAttributes();
+        assertEquals("1", v1Attributes.get("count"));
+
+        // Upgrade to v2 while the flow is running.
+        // Heartbeat processor stays running (not affected), so the process 
group is still "active".
+        util.changeFlowVersion(group.getId(), "2");
+        util.assertFlowUpToDate(group.getId());
+
+        // Verify existing CountFlowFiles still processes with the chained 
service after upgrade.
+        // The exact count depends on timing (GenerateFlowFile may have 
produced extra flow files before the upgrade stopped it),
+        // but the count must be greater than 1 to prove the chained service 
(newCountService + countService) is working.
+        waitForQueueCount(connectionToTerminate.getId(), 2 * 
getNumberOfNodes());
+        final Map<String, String> v2CountAttributes = 
util.getQueueFlowFile(connectionToTerminate.getId(), 
getNumberOfNodes()).getFlowFile().getAttributes();
+        final int count = Integer.parseInt(v2CountAttributes.get("count"));
+        assertTrue(count > 1, "Expected count > 1 with chained service but was 
" + count);
+
+        // Query the current flow to find connections by their source 
processor names
+        final FlowDTO v2Flow = 
getNifiClient().getFlowClient().getProcessGroup(group.getId()).getProcessGroupFlow().getFlow();
+        final Set<ConnectionEntity> v2Connections = v2Flow.getConnections();
+
+        // Verify new SetAttribute processor is running and processing flow 
files
+        final String setAttrConnectionId = v2Connections.stream()
+            .filter(c -> 
"SetAttribute".equals(c.getComponent().getSource().getName()))
+            .map(ConnectionEntity::getId)
+            .findFirst().orElseThrow();
+        waitForQueueCount(setAttrConnectionId, getNumberOfNodes());
+        final Map<String, String> v2SetAttrAttributes = 
util.getQueueFlowFile(setAttrConnectionId, 0).getFlowFile().getAttributes();
+        assertEquals("v2", v2SetAttrAttributes.get("marker"));
+
+        // Verify new CountFlowFiles processor referencing new service is 
running
+        final String newCountConnectionId = v2Connections.stream()
+            .filter(c -> 
"CountFlowFiles".equals(c.getComponent().getSource().getName()))
+            .filter(c -> !c.getId().equals(connectionToTerminate.getId()))
+            .filter(c -> 
c.getComponent().getDestination().getName().equals("TerminateFlowFile"))
+            .map(ConnectionEntity::getId)
+            .findFirst().orElseThrow();
+        waitForQueueCount(newCountConnectionId, getNumberOfNodes());
+        final Map<String, String> v2NewCountAttributes = 
util.getQueueFlowFile(newCountConnectionId, 0).getFlowFile().getAttributes();
+        assertNotNull(v2NewCountAttributes.get("count"));
+    }
+
+    /**
+     * Tests that when upgrading a fully stopped versioned process group, 
newly added components
+     * are NOT automatically started or enabled.
+     *
+     * Uses the same v1/v2 flow structure as the positive test but does not 
start any components
+     * before the upgrade.
+     */
+    @Test
+    public void testNewComponentsNotStartedWhenGroupStopped() throws 
NiFiClientException, IOException, InterruptedException {
+        final FlowRegistryClientEntity clientEntity = registerClient();
+        final NiFiClientUtil util = getClientUtil();
+
+        final ProcessGroupEntity group = util.createProcessGroup("Parent", 
"root");
+        final ControllerServiceEntity countService = 
util.createControllerService("StandardCountService", group.getId());
+
+        final ProcessorEntity generate = 
util.createProcessor("GenerateFlowFile", group.getId());
+        final ProcessorEntity countProcessor = 
util.createProcessor("CountFlowFiles", group.getId());
+        util.updateProcessorProperties(countProcessor, 
Collections.singletonMap("Count Service", countService.getComponent().getId()));
+
+        final ProcessorEntity terminate = 
util.createProcessor("TerminateFlowFile", group.getId());
+        util.createConnection(countProcessor, terminate, "success");
+        util.createConnection(generate, countProcessor, "success");
+
+        // Save as v1
+        final VersionControlInformationEntity vci = 
util.startVersionControl(group, clientEntity, TEST_FLOWS_BUCKET, "StoppedFlow");
+
+        // Build v2: add new service, new processor, modify GenerateFlowFile
+        final ControllerServiceEntity newCountService = 
util.createControllerService("StandardCountService", group.getId());
+        util.updateControllerServiceProperties(countService, 
Collections.singletonMap("Dependent Service", 
newCountService.getComponent().getId()));
+
+        final ProcessorEntity setAttribute = 
util.createProcessor("SetAttribute", group.getId());
+        util.updateProcessorProperties(setAttribute, 
Collections.singletonMap("marker", "v2"));
+        util.createConnection(setAttribute, terminate, "success");
+        util.createConnection(generate, setAttribute, "success");
+
+        util.updateProcessorProperties(generate, 
Collections.singletonMap("Text", "Hello World"));
+
+        // Save as v2
+        util.saveFlowVersion(group, clientEntity, vci);
+
+        // Switch to v1 -- do NOT start anything
+        util.changeFlowVersion(group.getId(), "1");
+        util.assertFlowStaleAndUnmodified(group.getId());
+
+        // Upgrade to v2 while the flow is fully stopped
+        util.changeFlowVersion(group.getId(), "2");
+        util.assertFlowUpToDate(group.getId());
+
+        // Verify all processors are STOPPED after upgrade
+        final FlowDTO v2Flow = 
getNifiClient().getFlowClient().getProcessGroup(group.getId()).getProcessGroupFlow().getFlow();
+        for (final ProcessorEntity processor : v2Flow.getProcessors()) {
+            assertEquals("STOPPED", processor.getComponent().getState(),
+                "Processor " + processor.getComponent().getName() + " should 
be STOPPED when group was not active before upgrade");
+        }
+
+        // Verify all controller services are DISABLED after upgrade
+        final Set<ControllerServiceEntity> v2Services = 
getNifiClient().getFlowClient()
+            .getControllerServices(group.getId()).getControllerServices();
+        for (final ControllerServiceEntity service : v2Services) {
+            assertEquals("DISABLED", service.getComponent().getState(),
+                "Service " + service.getComponent().getName() + " should be 
DISABLED when group was not active before upgrade");
+        }
+    }
+
     @Test
     public void testStopVersionControlThenSetVersionControlInfo() throws 
NiFiClientException, IOException, InterruptedException {
         final FlowRegistryClientEntity clientEntity = registerClient();


Reply via email to