This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c757de20f1b NIFI-15749 - Start newly added components when upgrading
versioned process group (#11061)
c757de20f1b is described below
commit c757de20f1b5ec39dc9c8e3b5913f9539badd1af
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Mar 27 20:16:56 2026 +0100
NIFI-15749 - Start newly added components when upgrading versioned process
group (#11061)
* NIFI-15749 - Start newly added components when upgrading versioned
process group
* active state based on processors only
---
.../RetainExistingStateComponentScheduler.java | 68 ++++-
.../RetainExistingStateComponentSchedulerTest.java | 304 +++++++++++++++++++++
.../tests/system/registry/RegistryClientIT.java | 186 +++++++++++++
3 files changed, 551 insertions(+), 7 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
index 913c586356e..393b9866bdf 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/RetainExistingStateComponentScheduler.java
@@ -18,6 +18,7 @@
package org.apache.nifi.groups;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
@@ -33,24 +34,46 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+/**
+ * A {@link ComponentScheduler} that retains the existing state of components
during a flow update. For components that already existed
+ * before the update, the scheduler only starts or enables them if they were
previously running or enabled. Components that were stopped
+ * or disabled remain in that state.
+ *
+ * <p>For components that are newly added by the update (i.e., not present
before the update), the scheduler determines whether to start
+ * or enable them based on whether the process group is currently "active".
The process group is considered active if at least one processor
+ * is running at the time this scheduler is constructed. During a version
upgrade, the REST layer stops only the affected (modified)
+ * processors before delegating to the synchronizer, so unaffected processors
remain in their original state and serve as the signal
+ * for whether the group is active.</p>
+ *
+ * <p>When the process group is active, newly added processors are started and
newly added controller services are enabled. When the
+ * process group is not active (e.g., fully stopped before the upgrade, or an
initial import into an empty group), newly added
+ * components remain stopped or disabled.</p>
+ */
public class RetainExistingStateComponentScheduler implements
ComponentScheduler {
private static final Logger logger =
LoggerFactory.getLogger(RetainExistingStateComponentScheduler.class);
private final ComponentScheduler delegate;
private final Map<String, ScheduledState> componentStates;
private final Map<String, ControllerServiceState> controllerServiceStates;
+ private final boolean processGroupActive;
public RetainExistingStateComponentScheduler(final ProcessGroup
processGroup, final ComponentScheduler delegate) {
this.delegate = delegate;
this.componentStates = mapComponentStates(processGroup);
this.controllerServiceStates =
mapControllerServiceStates(processGroup);
+ this.processGroupActive = hasActiveRuntimeState(processGroup);
}
@Override
public void startComponent(final Connectable component) {
final ScheduledState existingState =
componentStates.get(component.getIdentifier());
if (existingState == null) {
- logger.debug("Will not start {} because it was not previously
known in this Process Group", component);
+ if (processGroupActive) {
+ logger.debug("Starting new component {} because the Process
Group is active", component);
+ delegate.startComponent(component);
+ } else {
+ logger.debug("Will not start {} because it was not previously
known in this Process Group and the Process Group is not active", component);
+ }
return;
}
@@ -67,7 +90,12 @@ public class RetainExistingStateComponentScheduler
implements ComponentScheduler
public void startStatelessGroup(final ProcessGroup group) {
final ScheduledState existingState =
componentStates.get(group.getIdentifier());
if (existingState == null) {
- logger.debug("Will not start {} because it was not previously
known in this Process Group", group);
+ if (processGroupActive) {
+ logger.debug("Starting new stateless group {} because the
Process Group is active", group);
+ delegate.startStatelessGroup(group);
+ } else {
+ logger.debug("Will not start {} because it was not previously
known in this Process Group and the Process Group is not active", group);
+ }
return;
}
@@ -92,6 +120,12 @@ public class RetainExistingStateComponentScheduler
implements ComponentScheduler
@Override
public void transitionComponentState(final Connectable component, final
org.apache.nifi.flow.ScheduledState desiredState) {
+ final ScheduledState existingState =
componentStates.get(component.getIdentifier());
+ if (existingState == null && processGroupActive && desiredState !=
org.apache.nifi.flow.ScheduledState.DISABLED) {
+ logger.debug("Starting new component {} because the Process Group
is active and desired state is {}", component, desiredState);
+ delegate.startComponent(component);
+ return;
+ }
delegate.transitionComponentState(component, desiredState);
}
@@ -103,12 +137,17 @@ public class RetainExistingStateComponentScheduler
implements ComponentScheduler
final ControllerServiceState existingState =
controllerServiceStates.get(service.getIdentifier());
if (existingState == null) {
- logger.debug("Will not enable {} because it was not previously
known in this Process Group", service);
+ if (processGroupActive) {
+ logger.debug("Enabling new service {} because the Process
Group is active", service);
+ toEnable.add(service);
+ } else {
+ logger.debug("Will not enable {} because it was not
previously known in this Process Group and the Process Group is not active",
service);
+ }
continue;
}
if (existingState != ControllerServiceState.ENABLED &&
existingState != ControllerServiceState.ENABLING) {
- logger.debug("Will not enable {} because its previously state
was {}", service, existingState);
+ logger.debug("Will not enable {} because its previous state
was {}", service, existingState);
continue;
}
@@ -139,6 +178,10 @@ public class RetainExistingStateComponentScheduler
implements ComponentScheduler
delegate.resume();
}
+ boolean isProcessGroupActive() {
+ return processGroupActive;
+ }
+
private Map<String, ControllerServiceState>
mapControllerServiceStates(final ProcessGroup group) {
final Set<ControllerServiceNode> services =
group.findAllControllerServices();
final Map<String, ControllerServiceState> serviceStates =
services.stream()
@@ -167,7 +210,18 @@ public class RetainExistingStateComponentScheduler
implements ComponentScheduler
return componentStates;
}
- private void findAllConnectables(final ProcessGroup group, final
Set<Connectable> connectables) {
+ static boolean hasActiveRuntimeState(final ProcessGroup group) {
+ for (final ProcessorNode processor : group.findAllProcessors()) {
+ final ScheduledState state = processor.getScheduledState();
+ if (state == ScheduledState.RUNNING || state ==
ScheduledState.STARTING) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private static void findAllConnectables(final ProcessGroup group, final
Set<Connectable> connectables) {
connectables.addAll(group.getInputPorts());
connectables.addAll(group.getOutputPorts());
connectables.addAll(group.getFunnels());
@@ -182,10 +236,10 @@ public class RetainExistingStateComponentScheduler
implements ComponentScheduler
}
}
- private void findAllStatelessGroups(final ProcessGroup start, final
Set<ProcessGroup> statelessGroups) {
+ private static void findAllStatelessGroups(final ProcessGroup start, final
Set<ProcessGroup> statelessGroups) {
if (start.resolveExecutionEngine() == ExecutionEngine.STATELESS) {
statelessGroups.add(start);
- return; // No need to go further, as the top-level stateless group
is all we need.
+ return;
}
for (final ProcessGroup childGroup : start.getProcessGroups()) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/RetainExistingStateComponentSchedulerTest.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/RetainExistingStateComponentSchedulerTest.java
new file mode 100644
index 00000000000..8f5add62a79
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/RetainExistingStateComponentSchedulerTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.groups;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.flow.ExecutionEngine;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class RetainExistingStateComponentSchedulerTest {
+
+ private ComponentScheduler delegate;
+
+ @BeforeEach
+ void setup() {
+ delegate = mock(ComponentScheduler.class);
+ }
+
+ @Test
+ void testEmptyProcessGroupIsNotActive() {
+ final ProcessGroup group = createProcessGroup(Collections.emptySet(),
Collections.emptySet(), Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+ assertFalse(scheduler.isProcessGroupActive());
+
assertFalse(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+ }
+
+ @Test
+ void testProcessGroupWithRunningProcessorIsActive() {
+ final ProcessorNode processor = createMockProcessor("proc-1",
ScheduledState.RUNNING);
+ final ProcessGroup group = createProcessGroup(Set.of(processor),
Collections.emptySet(), Collections.emptySet());
+
assertTrue(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+ assertTrue(new RetainExistingStateComponentScheduler(group,
delegate).isProcessGroupActive());
+ }
+
+ @Test
+ void testProcessGroupWithStartingProcessorIsActive() {
+ final ProcessorNode processor = createMockProcessor("proc-1",
ScheduledState.STARTING);
+ final ProcessGroup group = createProcessGroup(Set.of(processor),
Collections.emptySet(), Collections.emptySet());
+
assertTrue(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+ }
+
+ @Test
+ void testProcessGroupWithEnabledServiceOnlyIsNotActive() {
+ final ControllerServiceNode service = createMockService("svc-1",
ControllerServiceState.ENABLED);
+ final ProcessGroup group = createProcessGroup(Collections.emptySet(),
Collections.emptySet(), Set.of(service));
+
assertFalse(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+ assertFalse(new RetainExistingStateComponentScheduler(group,
delegate).isProcessGroupActive());
+ }
+
+ @Test
+ void testProcessGroupWithEnablingServiceOnlyIsNotActive() {
+ final ControllerServiceNode service = createMockService("svc-1",
ControllerServiceState.ENABLING);
+ final ProcessGroup group = createProcessGroup(Collections.emptySet(),
Collections.emptySet(), Set.of(service));
+
assertFalse(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+ }
+
+ @Test
+ void testProcessGroupWithStoppedComponentsIsNotActive() {
+ final ProcessorNode processor = createMockProcessor("proc-1",
ScheduledState.STOPPED);
+ final ControllerServiceNode service = createMockService("svc-1",
ControllerServiceState.DISABLED);
+ final ProcessGroup group = createProcessGroup(Set.of(processor),
Collections.emptySet(), Set.of(service));
+
assertFalse(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+ assertFalse(new RetainExistingStateComponentScheduler(group,
delegate).isProcessGroupActive());
+ }
+
+ @Test
+ void testProcessGroupWithRunningPortIsNotActive() {
+ final Port runningPort = mock(Port.class);
+ when(runningPort.getIdentifier()).thenReturn("port-1");
+
when(runningPort.getScheduledState()).thenReturn(ScheduledState.RUNNING);
+ final ProcessGroup group = createProcessGroup(Collections.emptySet(),
Set.of(runningPort), Collections.emptySet());
+
assertFalse(RetainExistingStateComponentScheduler.hasActiveRuntimeState(group));
+ }
+
+ @Test
+ void testNewComponentStartedWhenProcessGroupActive() {
+ final ProcessorNode runningProcessor =
createMockProcessor("running-proc", ScheduledState.RUNNING);
+ final ProcessGroup group =
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(),
Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+ assertTrue(scheduler.isProcessGroupActive());
+
+ final Connectable newComponent = mock(Connectable.class);
+ when(newComponent.getIdentifier()).thenReturn("new-component-id");
+
+ scheduler.startComponent(newComponent);
+ verify(delegate).startComponent(newComponent);
+ }
+
+ @Test
+ void testNewComponentNotStartedWhenProcessGroupInactive() {
+ final ProcessGroup group = createProcessGroup(Collections.emptySet(),
Collections.emptySet(), Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+ assertFalse(scheduler.isProcessGroupActive());
+
+ final Connectable newComponent = mock(Connectable.class);
+ when(newComponent.getIdentifier()).thenReturn("new-component-id");
+
+ scheduler.startComponent(newComponent);
+ verify(delegate, never()).startComponent(newComponent);
+ }
+
+ @Test
+ void testExistingRunningComponentStarted() {
+ final ProcessorNode runningProcessor =
createMockProcessor("running-proc", ScheduledState.RUNNING);
+ final ProcessGroup group =
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(),
Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+
+ scheduler.startComponent(runningProcessor);
+ verify(delegate).startComponent(runningProcessor);
+ }
+
+ @Test
+ void testExistingStoppedComponentNotStarted() {
+ final ProcessorNode stoppedProcessor =
createMockProcessor("stopped-proc", ScheduledState.STOPPED);
+ final ProcessorNode runningProcessor =
createMockProcessor("running-proc", ScheduledState.RUNNING);
+ final ProcessGroup group = createProcessGroup(Set.of(stoppedProcessor,
runningProcessor), Collections.emptySet(), Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+
+ scheduler.startComponent(stoppedProcessor);
+ verify(delegate, never()).startComponent(stoppedProcessor);
+ }
+
+ @Test
+ void testNewServiceEnabledWhenProcessGroupActive() {
+ final ProcessorNode runningProcessor =
createMockProcessor("running-proc", ScheduledState.RUNNING);
+ final ControllerServiceNode existingService =
createMockService("existing-svc", ControllerServiceState.ENABLED);
+ final ProcessGroup group =
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(),
Set.of(existingService));
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+ assertTrue(scheduler.isProcessGroupActive());
+
+ final ControllerServiceNode newService = createMockService("new-svc",
ControllerServiceState.DISABLED);
+ scheduler.enableControllerServicesAsync(List.of(newService));
+ verify(delegate).enableControllerServicesAsync(Set.of(newService));
+ }
+
+ @Test
+ void testNewServiceNotEnabledWhenProcessGroupInactive() {
+ final ProcessGroup group = createProcessGroup(Collections.emptySet(),
Collections.emptySet(), Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+
+ final ControllerServiceNode newService = createMockService("new-svc",
ControllerServiceState.DISABLED);
+ scheduler.enableControllerServicesAsync(List.of(newService));
+ verify(delegate).enableControllerServicesAsync(Collections.emptySet());
+ }
+
+ @Test
+ void testExistingEnabledServiceReEnabled() {
+ final ControllerServiceNode enabledService =
createMockService("enabled-svc", ControllerServiceState.ENABLED);
+ final ProcessGroup group = createProcessGroup(Collections.emptySet(),
Collections.emptySet(), Set.of(enabledService));
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+
+ scheduler.enableControllerServicesAsync(List.of(enabledService));
+ verify(delegate).enableControllerServicesAsync(Set.of(enabledService));
+ }
+
+ @Test
+ void testExistingDisabledServiceNotReEnabled() {
+ final ControllerServiceNode disabledService =
createMockService("disabled-svc", ControllerServiceState.DISABLED);
+ final ProcessorNode runningProcessor =
createMockProcessor("running-proc", ScheduledState.RUNNING);
+ final ProcessGroup group =
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(),
Set.of(disabledService));
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+
+ scheduler.enableControllerServicesAsync(List.of(disabledService));
+ verify(delegate).enableControllerServicesAsync(Collections.emptySet());
+ }
+
+ @Test
+ void testNewStatelessGroupStartedWhenProcessGroupActive() {
+ final ProcessorNode runningProcessor =
createMockProcessor("running-proc", ScheduledState.RUNNING);
+ final ProcessGroup parentGroup =
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(),
Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(parentGroup, delegate);
+ assertTrue(scheduler.isProcessGroupActive());
+
+ final ProcessGroup newStatelessGroup = mock(ProcessGroup.class);
+
when(newStatelessGroup.getIdentifier()).thenReturn("new-stateless-group");
+
+ scheduler.startStatelessGroup(newStatelessGroup);
+ verify(delegate).startStatelessGroup(newStatelessGroup);
+ }
+
+ @Test
+ void testNewStatelessGroupNotStartedWhenProcessGroupInactive() {
+ final ProcessGroup parentGroup =
createProcessGroup(Collections.emptySet(), Collections.emptySet(),
Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(parentGroup, delegate);
+
+ final ProcessGroup newStatelessGroup = mock(ProcessGroup.class);
+
when(newStatelessGroup.getIdentifier()).thenReturn("new-stateless-group");
+
+ scheduler.startStatelessGroup(newStatelessGroup);
+ verify(delegate, never()).startStatelessGroup(newStatelessGroup);
+ }
+
+ @Test
+ void testTransitionComponentStateStartsNewComponentWhenActive() {
+ final ProcessorNode runningProcessor =
createMockProcessor("running-proc", ScheduledState.RUNNING);
+ final ProcessGroup group =
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(),
Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+
+ final Connectable newComponent = mock(Connectable.class);
+ when(newComponent.getIdentifier()).thenReturn("new-component-id");
+
+ scheduler.transitionComponentState(newComponent,
org.apache.nifi.flow.ScheduledState.ENABLED);
+ verify(delegate).startComponent(newComponent);
+ verify(delegate, never()).transitionComponentState(newComponent,
org.apache.nifi.flow.ScheduledState.ENABLED);
+ }
+
+ @Test
+ void testTransitionComponentStateDoesNotStartNewDisabledComponent() {
+ final ProcessorNode runningProcessor =
createMockProcessor("running-proc", ScheduledState.RUNNING);
+ final ProcessGroup group =
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(),
Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+
+ final Connectable newComponent = mock(Connectable.class);
+ when(newComponent.getIdentifier()).thenReturn("new-component-id");
+
+ scheduler.transitionComponentState(newComponent,
org.apache.nifi.flow.ScheduledState.DISABLED);
+ verify(delegate, never()).startComponent(newComponent);
+ verify(delegate).transitionComponentState(newComponent,
org.apache.nifi.flow.ScheduledState.DISABLED);
+ }
+
+ @Test
+ void testTransitionComponentStateDoesNotStartNewComponentWhenInactive() {
+ final ProcessGroup group = createProcessGroup(Collections.emptySet(),
Collections.emptySet(), Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+
+ final Connectable newComponent = mock(Connectable.class);
+ when(newComponent.getIdentifier()).thenReturn("new-component-id");
+
+ scheduler.transitionComponentState(newComponent,
org.apache.nifi.flow.ScheduledState.ENABLED);
+ verify(delegate, never()).startComponent(newComponent);
+ verify(delegate).transitionComponentState(newComponent,
org.apache.nifi.flow.ScheduledState.ENABLED);
+ }
+
+ @Test
+ void testTransitionComponentStateDelegatesForExistingComponents() {
+ final ProcessorNode runningProcessor =
createMockProcessor("running-proc", ScheduledState.RUNNING);
+ final ProcessGroup group =
createProcessGroup(Set.of(runningProcessor), Collections.emptySet(),
Collections.emptySet());
+ final RetainExistingStateComponentScheduler scheduler = new
RetainExistingStateComponentScheduler(group, delegate);
+
+ scheduler.transitionComponentState(runningProcessor,
org.apache.nifi.flow.ScheduledState.ENABLED);
+ verify(delegate, never()).startComponent(runningProcessor);
+ verify(delegate).transitionComponentState(runningProcessor,
org.apache.nifi.flow.ScheduledState.ENABLED);
+ }
+
+ private ProcessorNode createMockProcessor(final String identifier, final
ScheduledState state) {
+ final ProcessorNode processor = mock(ProcessorNode.class);
+ when(processor.getIdentifier()).thenReturn(identifier);
+ when(processor.getScheduledState()).thenReturn(state);
+ return processor;
+ }
+
+ private ControllerServiceNode createMockService(final String identifier,
final ControllerServiceState state) {
+ final ControllerServiceNode service =
mock(ControllerServiceNode.class);
+ when(service.getIdentifier()).thenReturn(identifier);
+ when(service.getState()).thenReturn(state);
+ return service;
+ }
+
+ private ProcessGroup createProcessGroup(final Set<ProcessorNode>
processors, final Set<Port> inputPorts, final Set<ControllerServiceNode>
services) {
+ final ProcessGroup group = Mockito.mock(ProcessGroup.class);
+ when(group.getProcessors()).thenReturn(processors);
+ when(group.findAllProcessors()).thenReturn(new
java.util.ArrayList<>(processors));
+ when(group.getInputPorts()).thenReturn(inputPorts);
+ when(group.getOutputPorts()).thenReturn(Collections.emptySet());
+ when(group.getFunnels()).thenReturn(Collections.emptySet());
+
when(group.getRemoteProcessGroups()).thenReturn(Collections.emptySet());
+ when(group.getProcessGroups()).thenReturn(Collections.emptySet());
+ when(group.findAllControllerServices()).thenReturn(new
HashSet<>(services));
+
when(group.resolveExecutionEngine()).thenReturn(ExecutionEngine.STANDARD);
+ return group;
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
index c3f6b9ec73d..d9876ceb725 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
@@ -364,6 +364,192 @@ public class RegistryClientIT extends NiFiSystemIT {
assertEquals("UP_TO_DATE", versionedFlowState);
}
+ /**
+ * Tests that when upgrading an active versioned process group, newly
added components are
+ * automatically started/enabled.
+ *
+ * A standalone "Heartbeat" GenerateFlowFile processor with
auto-terminated relationships is used as a sentinel
+ * that remains running throughout the upgrade. Because it has no
connection changes, property changes, or service
+ * references, it is not in the "affected" set and the REST layer does not
stop it. This guarantees the process
+ * group still has an active component when the synchronizer inspects the
group, triggering auto-start of new
+ * components.
+ *
+ * v1: GenerateFlowFile -> CountFlowFiles (uses CountService) ->
TerminateFlowFile, plus a standalone Heartbeat.
+ *
+ * v2: Same as v1 plus:
+ * - A new StandardCountService ("NewCountService") chained to the
existing service
+ * - A new SetAttribute processor connected to TerminateFlowFile
+ * - A new CountFlowFiles processor referencing NewCountService
connected to TerminateFlowFile
+ * - GenerateFlowFile text property changed to trigger restart
+ */
+ @Test
+ public void testNewComponentsStartedDuringVersionChange() throws
NiFiClientException, IOException, InterruptedException {
+ final FlowRegistryClientEntity clientEntity = registerClient();
+ final NiFiClientUtil util = getClientUtil();
+
+ final ProcessGroupEntity group = util.createProcessGroup("Parent",
"root");
+ final ControllerServiceEntity countService =
util.createControllerService("StandardCountService", group.getId());
+
+ final ProcessorEntity generate =
util.createProcessor("GenerateFlowFile", group.getId());
+ final ProcessorEntity countProcessor =
util.createProcessor("CountFlowFiles", group.getId());
+ util.updateProcessorProperties(countProcessor,
Collections.singletonMap("Count Service", countService.getComponent().getId()));
+
+ final ProcessorEntity terminate =
util.createProcessor("TerminateFlowFile", group.getId());
+ final ConnectionEntity connectionToTerminate =
util.createConnection(countProcessor, terminate, "success");
+ util.setFifoPrioritizer(connectionToTerminate);
+ util.createConnection(generate, countProcessor, "success");
+
+ // Standalone "Heartbeat" processor: present and unchanged in both v1
and v2, stays running during upgrade
+ final ProcessorEntity heartbeat =
util.createProcessor("GenerateFlowFile", group.getId());
+ util.setAutoTerminatedRelationships(heartbeat, "success");
+
+ // Save as v1
+ final VersionControlInformationEntity vci =
util.startVersionControl(group, clientEntity, TEST_FLOWS_BUCKET, "Parent");
+
+ // Build v2 additions
+
+ // New service chained to the existing service via "Dependent Service"
+ final ControllerServiceEntity newCountService =
util.createControllerService("StandardCountService", group.getId());
+ util.updateControllerServiceProperties(countService,
Collections.singletonMap("Dependent Service",
newCountService.getComponent().getId()));
+
+ // New processor with no service dependency, connected to
TerminateFlowFile
+ final ProcessorEntity setAttribute =
util.createProcessor("SetAttribute", group.getId());
+ util.updateProcessorProperties(setAttribute,
Collections.singletonMap("marker", "v2"));
+ util.createConnection(setAttribute, terminate, "success");
+ util.createConnection(generate, setAttribute, "success");
+
+ // New processor referencing the new service, connected to
TerminateFlowFile
+ final ProcessorEntity newCountProcessor =
util.createProcessor("CountFlowFiles", group.getId());
+ util.updateProcessorProperties(newCountProcessor,
Collections.singletonMap("Count Service",
newCountService.getComponent().getId()));
+ final ConnectionEntity newCountToTerminate =
util.createConnection(newCountProcessor, terminate, "success");
+ util.setFifoPrioritizer(newCountToTerminate);
+ util.createConnection(generate, newCountProcessor, "success");
+
+ // Change GenerateFlowFile property so it gets restarted during the
version change
+ util.updateProcessorProperties(generate,
Collections.singletonMap("Text", "Hello World"));
+
+ // Save as v2
+ util.saveFlowVersion(group, clientEntity, vci);
+
+ // Switch back to v1 and start the flow
+ util.changeFlowVersion(group.getId(), "1");
+ util.assertFlowStaleAndUnmodified(group.getId());
+
+ util.enableControllerService(countService);
+ util.waitForValidProcessor(generate.getId());
+ util.startProcessor(generate);
+ util.waitForValidProcessor(countProcessor.getId());
+ util.startProcessor(countProcessor);
+ util.waitForValidProcessor(heartbeat.getId());
+ util.startProcessor(heartbeat);
+
+ // Verify v1 flow works
+ waitForQueueCount(connectionToTerminate.getId(), getNumberOfNodes());
+ final Map<String, String> v1Attributes =
util.getQueueFlowFile(connectionToTerminate.getId(),
0).getFlowFile().getAttributes();
+ assertEquals("1", v1Attributes.get("count"));
+
+ // Upgrade to v2 while the flow is running.
+ // Heartbeat processor stays running (not affected), so the process
group is still "active".
+ util.changeFlowVersion(group.getId(), "2");
+ util.assertFlowUpToDate(group.getId());
+
+ // Verify existing CountFlowFiles still processes with the chained
service after upgrade.
+ // The exact count depends on timing (GenerateFlowFile may have
produced extra flow files before the upgrade stopped it),
+ // but the count must be greater than 1 to prove the chained service
(newCountService + countService) is working.
+ waitForQueueCount(connectionToTerminate.getId(), 2 *
getNumberOfNodes());
+ final Map<String, String> v2CountAttributes =
util.getQueueFlowFile(connectionToTerminate.getId(),
getNumberOfNodes()).getFlowFile().getAttributes();
+ final int count = Integer.parseInt(v2CountAttributes.get("count"));
+ assertTrue(count > 1, "Expected count > 1 with chained service but was
" + count);
+
+ // Query the current flow to find connections by their source
processor names
+ final FlowDTO v2Flow =
getNifiClient().getFlowClient().getProcessGroup(group.getId()).getProcessGroupFlow().getFlow();
+ final Set<ConnectionEntity> v2Connections = v2Flow.getConnections();
+
+ // Verify new SetAttribute processor is running and processing flow
files
+ final String setAttrConnectionId = v2Connections.stream()
+ .filter(c ->
"SetAttribute".equals(c.getComponent().getSource().getName()))
+ .map(ConnectionEntity::getId)
+ .findFirst().orElseThrow();
+ waitForQueueCount(setAttrConnectionId, getNumberOfNodes());
+ final Map<String, String> v2SetAttrAttributes =
util.getQueueFlowFile(setAttrConnectionId, 0).getFlowFile().getAttributes();
+ assertEquals("v2", v2SetAttrAttributes.get("marker"));
+
+ // Verify new CountFlowFiles processor referencing new service is
running
+ final String newCountConnectionId = v2Connections.stream()
+ .filter(c ->
"CountFlowFiles".equals(c.getComponent().getSource().getName()))
+ .filter(c -> !c.getId().equals(connectionToTerminate.getId()))
+ .filter(c ->
c.getComponent().getDestination().getName().equals("TerminateFlowFile"))
+ .map(ConnectionEntity::getId)
+ .findFirst().orElseThrow();
+ waitForQueueCount(newCountConnectionId, getNumberOfNodes());
+ final Map<String, String> v2NewCountAttributes =
util.getQueueFlowFile(newCountConnectionId, 0).getFlowFile().getAttributes();
+ assertNotNull(v2NewCountAttributes.get("count"));
+ }
+
+ /**
+ * Tests that when upgrading a fully stopped versioned process group,
newly added components
+ * are NOT automatically started or enabled.
+ *
+ * Uses the same v1/v2 flow structure as the positive test but does not
start any components
+ * before the upgrade.
+ */
+ @Test
+ public void testNewComponentsNotStartedWhenGroupStopped() throws
NiFiClientException, IOException, InterruptedException {
+ final FlowRegistryClientEntity clientEntity = registerClient();
+ final NiFiClientUtil util = getClientUtil();
+
+ final ProcessGroupEntity group = util.createProcessGroup("Parent",
"root");
+ final ControllerServiceEntity countService =
util.createControllerService("StandardCountService", group.getId());
+
+ final ProcessorEntity generate =
util.createProcessor("GenerateFlowFile", group.getId());
+ final ProcessorEntity countProcessor =
util.createProcessor("CountFlowFiles", group.getId());
+ util.updateProcessorProperties(countProcessor,
Collections.singletonMap("Count Service", countService.getComponent().getId()));
+
+ final ProcessorEntity terminate =
util.createProcessor("TerminateFlowFile", group.getId());
+ util.createConnection(countProcessor, terminate, "success");
+ util.createConnection(generate, countProcessor, "success");
+
+ // Save as v1
+ final VersionControlInformationEntity vci =
util.startVersionControl(group, clientEntity, TEST_FLOWS_BUCKET, "StoppedFlow");
+
+ // Build v2: add new service, new processor, modify GenerateFlowFile
+ final ControllerServiceEntity newCountService =
util.createControllerService("StandardCountService", group.getId());
+ util.updateControllerServiceProperties(countService,
Collections.singletonMap("Dependent Service",
newCountService.getComponent().getId()));
+
+ final ProcessorEntity setAttribute =
util.createProcessor("SetAttribute", group.getId());
+ util.updateProcessorProperties(setAttribute,
Collections.singletonMap("marker", "v2"));
+ util.createConnection(setAttribute, terminate, "success");
+ util.createConnection(generate, setAttribute, "success");
+
+ util.updateProcessorProperties(generate,
Collections.singletonMap("Text", "Hello World"));
+
+ // Save as v2
+ util.saveFlowVersion(group, clientEntity, vci);
+
+ // Switch to v1 -- do NOT start anything
+ util.changeFlowVersion(group.getId(), "1");
+ util.assertFlowStaleAndUnmodified(group.getId());
+
+ // Upgrade to v2 while the flow is fully stopped
+ util.changeFlowVersion(group.getId(), "2");
+ util.assertFlowUpToDate(group.getId());
+
+ // Verify all processors are STOPPED after upgrade
+ final FlowDTO v2Flow =
getNifiClient().getFlowClient().getProcessGroup(group.getId()).getProcessGroupFlow().getFlow();
+ for (final ProcessorEntity processor : v2Flow.getProcessors()) {
+ assertEquals("STOPPED", processor.getComponent().getState(),
+ "Processor " + processor.getComponent().getName() + " should
be STOPPED when group was not active before upgrade");
+ }
+
+ // Verify all controller services are DISABLED after upgrade
+ final Set<ControllerServiceEntity> v2Services =
getNifiClient().getFlowClient()
+ .getControllerServices(group.getId()).getControllerServices();
+ for (final ControllerServiceEntity service : v2Services) {
+ assertEquals("DISABLED", service.getComponent().getState(),
+ "Service " + service.getComponent().getName() + " should be
DISABLED when group was not active before upgrade");
+ }
+ }
+
@Test
public void testStopVersionControlThenSetVersionControlInfo() throws
NiFiClientException, IOException, InterruptedException {
final FlowRegistryClientEntity clientEntity = registerClient();