This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 1348bb6fa3 NIFI-15467: Ensure that Connectors' versions are
automatically resolv… (#10811)
1348bb6fa3 is described below
commit 1348bb6fa307f98decfa2b3a0fac47e56e266968
Author: Mark Payne <[email protected]>
AuthorDate: Tue Jan 27 09:04:29 2026 -0500
NIFI-15467: Ensure that Connectors' versions are automatically resolv…
(#10811)
* NIFI-15467: Ensure that Connectors' versions are automatically resolved
on startup
- Allow connectors to specify Bundle Compatability when updating flow
- Default to BundleCompatability.RESOLVE_BUNDLE when connectors are
updating flows
- Bug fixes; updated MockConnectorInitializationContext to extend from
StandardConnectorInitializationContext to cut down on code duplication
* NIFI-15467: Addressed review feedback
---
.../connector/util/VersionedFlowUtils.java | 188 ++-------------
.../connector/util/TestVersionedFlowUtils.java | 263 +++------------------
.../server/MockConnectorInitializationContext.java | 124 +---------
.../connector/StandardComponentBundleLookup.java | 127 ++++++++++
.../StandardConnectorInitializationContext.java | 67 +++++-
.../connector/StandardConnectorNode.java | 14 +-
.../TestStandardComponentBundleLookup.java | 246 +++++++++++++++++++
...TestStandardConnectorInitializationContext.java | 243 +++++++++++++++++++
.../org/apache/nifi/web/api/ConnectorResource.java | 6 +-
.../tests/system/BundleResolutionConnector.java | 218 +++++++++++++++++
.../tests/system/GatedDataQueuingConnector.java | 5 +-
.../org.apache.nifi.components.connector.Connector | 5 +-
.../tests/system/connectors/ConnectorCrudIT.java | 66 ++++++
.../tests/system/connectors/ConnectorDrainIT.java | 2 +-
.../connectors/ConnectorVersionResolutionIT.java | 97 ++++++++
.../flows/connector-version-mismatch/flow.json | 52 ++++
16 files changed, 1195 insertions(+), 528 deletions(-)
diff --git
a/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java
b/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java
index 62c42e107b..d4a06b8658 100644
---
a/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java
+++
b/nifi-commons/nifi-connector-utils/src/main/java/org/apache/nifi/components/connector/util/VersionedFlowUtils.java
@@ -26,7 +26,6 @@ import org.apache.nifi.flow.ConnectableComponent;
import org.apache.nifi.flow.ConnectableComponentType;
import org.apache.nifi.flow.Position;
import org.apache.nifi.flow.ScheduledState;
-import org.apache.nifi.flow.VersionedConfigurableExtension;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
@@ -368,7 +367,7 @@ public class VersionedFlowUtils {
/**
* Updates all processors and controller services in the given process
group (and its child groups)
- * to use the latest available bundle version. See {@link
#getLatest(List)} for details on how
+ * to use the latest available bundle version. See {@link
ComponentBundleLookup#getLatestBundle(String)} for details on how
* version comparison is performed.
*
* @param processGroup the process group containing components to update
@@ -390,189 +389,32 @@ public class VersionedFlowUtils {
/**
* Updates the given processor to use the latest available bundle version.
- * See {@link #getLatest(List)} for details on how version comparison is
performed.
+ * If no bundle is available, the processor's bundle is left unchanged.
+ * See {@link ComponentBundleLookup#getLatestBundle(String)} for details
on how version comparison is performed.
*
* @param processor the processor to update
* @param componentBundleLookup the lookup used to find available bundles
for the processor type
+ * @return true if the bundle was updated, false if no bundle was available
*/
- public static void updateToLatestBundle(final VersionedProcessor
processor, final ComponentBundleLookup componentBundleLookup) {
- final Bundle latest = getLatestBundle(processor,
componentBundleLookup);
- processor.setBundle(latest);
+ public static boolean updateToLatestBundle(final VersionedProcessor
processor, final ComponentBundleLookup componentBundleLookup) {
+ final Optional<Bundle> latestBundle =
componentBundleLookup.getLatestBundle(processor.getType());
+ latestBundle.ifPresent(processor::setBundle);
+ return latestBundle.isPresent();
}
/**
* Updates the given controller service to use the latest available bundle
version.
- * See {@link #getLatest(List)} for details on how version comparison is
performed.
+ * If no bundle is available, the service's bundle is left unchanged.
+ * See {@link ComponentBundleLookup#getLatestBundle(String)} for details
on how version comparison is performed.
*
* @param service the controller service to update
* @param componentBundleLookup the lookup used to find available bundles
for the service type
+ * @return true if the bundle was updated, false if no bundle was available
*/
- public static void updateToLatestBundle(final VersionedControllerService
service, final ComponentBundleLookup componentBundleLookup) {
- final Bundle latest = getLatestBundle(service, componentBundleLookup);
- service.setBundle(latest);
+ public static boolean updateToLatestBundle(final
VersionedControllerService service, final ComponentBundleLookup
componentBundleLookup) {
+ final Optional<Bundle> latestBundle =
componentBundleLookup.getLatestBundle(service.getType());
+ latestBundle.ifPresent(service::setBundle);
+ return latestBundle.isPresent();
}
- private static Bundle getLatestBundle(final VersionedConfigurableExtension
versionedComponent, final ComponentBundleLookup componentBundleLookup) {
- final String type = versionedComponent.getType();
- final List<Bundle> bundles =
componentBundleLookup.getAvailableBundles(type);
- final List<Bundle> includingExisting = new ArrayList<>(bundles);
- if (versionedComponent.getBundle() != null &&
!includingExisting.contains(versionedComponent.getBundle())) {
- includingExisting.add(versionedComponent.getBundle());
- }
-
- return getLatest(includingExisting);
- }
-
- /**
- * Returns the bundle with the latest version from the given list.
- *
- * <p>Version comparison follows these rules:</p>
- * <ol>
- * <li>Versions are split by dots (e.g., "2.1.0" becomes ["2", "1",
"0"]).
- * This also supports calendar-date formats (e.g.,
"2026.01.01").</li>
- * <li>Each part is compared numerically when possible; numeric parts
are considered
- * newer than non-numeric parts (e.g., "2.0.0" > "2.0.next")</li>
- * <li>When base versions are equal, qualifiers are compared with the
following precedence
- * (highest to lowest):
- * <ul>
- * <li>Release (no qualifier)</li>
- * <li>RC (Release Candidate) - e.g., "-RC1", "-RC2"</li>
- * <li>M (Milestone) - e.g., "-M1", "-M2"</li>
- * <li>Other/unknown qualifiers</li>
- * <li>SNAPSHOT</li>
- * </ul>
- * </li>
- * <li>Within the same qualifier type, numeric suffixes are compared
- * (e.g., "2.0.0-RC2" > "2.0.0-RC1", "2.0.0-M4" >
"2.0.0-M1")</li>
- * </ol>
- *
- * <p>Examples of version ordering (highest to lowest):</p>
- * <ul>
- * <li>2.0.0 > 2.0.0-RC2 > 2.0.0-RC1 > 2.0.0-M4 > 2.0.0-M1
> 2.0.0-SNAPSHOT</li>
- * <li>2.1.0-SNAPSHOT > 2.0.0 (higher base version wins)</li>
- * <li>2026.01.01 > 2025.12.31 (calendar-date format)</li>
- * </ul>
- *
- * @param bundles the list of bundles to compare
- * @return the bundle with the latest version, or null if the list is empty
- */
- public static Bundle getLatest(final List<Bundle> bundles) {
- Bundle latest = null;
- for (final Bundle bundle : bundles) {
- if (latest == null || compareVersion(bundle.getVersion(),
latest.getVersion()) > 0) {
- latest = bundle;
- }
- }
-
- return latest;
- }
-
- private static int compareVersion(final String v1, final String v2) {
- final String baseVersion1 = getBaseVersion(v1);
- final String baseVersion2 = getBaseVersion(v2);
-
- final String[] parts1 = baseVersion1.split("\\.");
- final String[] parts2 = baseVersion2.split("\\.");
-
- final int length = Math.max(parts1.length, parts2.length);
- for (int i = 0; i < length; i++) {
- final String part1Str = i < parts1.length ? parts1[i] : "0";
- final String part2Str = i < parts2.length ? parts2[i] : "0";
-
- final int comparison = compareVersionPart(part1Str, part2Str);
- if (comparison != 0) {
- return comparison;
- }
- }
-
- // Base versions are equal; compare qualifiers
- final String qualifier1 = getQualifier(v1);
- final String qualifier2 = getQualifier(v2);
- return compareQualifiers(qualifier1, qualifier2);
- }
-
- private static int compareQualifiers(final String qualifier1, final String
qualifier2) {
- final int rank1 = getQualifierRank(qualifier1);
- final int rank2 = getQualifierRank(qualifier2);
-
- if (rank1 != rank2) {
- return Integer.compare(rank1, rank2);
- }
-
- // Same qualifier type; compare numeric suffixes (e.g., RC2 > RC1, M4
> M3)
- final int num1 = getQualifierNumber(qualifier1);
- final int num2 = getQualifierNumber(qualifier2);
- return Integer.compare(num1, num2);
- }
-
- private static int getQualifierRank(final String qualifier) {
- if (qualifier == null || qualifier.isEmpty()) {
- return 4;
- } else if (qualifier.startsWith("RC")) {
- return 3;
- } else if (qualifier.startsWith("M")) {
- return 2;
- } else if (qualifier.equals("SNAPSHOT")) {
- return 0;
- } else {
- return 1;
- }
- }
-
- private static int getQualifierNumber(final String qualifier) {
- if (qualifier == null || qualifier.isEmpty()) {
- return 0;
- }
-
- final StringBuilder digits = new StringBuilder();
- for (int i = 0; i < qualifier.length(); i++) {
- final char c = qualifier.charAt(i);
- if (Character.isDigit(c)) {
- digits.append(c);
- }
- }
-
- if (digits.isEmpty()) {
- return 0;
- }
-
- try {
- return Integer.parseInt(digits.toString());
- } catch (final NumberFormatException e) {
- return 0;
- }
- }
-
- private static String getQualifier(final String version) {
- final int qualifierIndex = version.indexOf('-');
- return qualifierIndex > 0 ? version.substring(qualifierIndex + 1) :
null;
- }
-
- private static int compareVersionPart(final String part1, final String
part2) {
- final Integer num1 = parseVersionPart(part1);
- final Integer num2 = parseVersionPart(part2);
-
- if (num1 != null && num2 != null) {
- return Integer.compare(num1, num2);
- } else if (num1 != null) {
- return 1;
- } else if (num2 != null) {
- return -1;
- } else {
- return part1.compareTo(part2);
- }
- }
-
- private static Integer parseVersionPart(final String part) {
- try {
- return Integer.parseInt(part);
- } catch (final NumberFormatException e) {
- return null;
- }
- }
-
- private static String getBaseVersion(final String version) {
- final int qualifierIndex = version.indexOf('-');
- return qualifierIndex > 0 ? version.substring(0, qualifierIndex) :
version;
- }
}
diff --git
a/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java
b/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java
index 5ff0149d5a..f3c9741d8e 100644
---
a/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java
+++
b/nifi-commons/nifi-connector-utils/src/test/java/org/apache/nifi/components/connector/util/TestVersionedFlowUtils.java
@@ -28,305 +28,112 @@ import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.util.HashSet;
-import java.util.List;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestVersionedFlowUtils {
- @Nested
- class GetLatest {
- @Test
- void testMinorVersion() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "1.0.0"),
- new Bundle("group", "artifact", "1.2.0")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("1.2.0", latestBundle.getVersion());
- }
-
- @Test
- void testMajorVersion() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "1.0.0"),
- new Bundle("group", "artifact", "2.0.0")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("2.0.0", latestBundle.getVersion());
- }
-
- @Test
- void testMoreDigitsLater() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "1.0"),
- new Bundle("group", "artifact", "1.0.1")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("1.0.1", latestBundle.getVersion());
- }
-
- @Test
- void testMoreDigitsFirst() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "1.0.1"),
- new Bundle("group", "artifact", "1.0")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("1.0.1", latestBundle.getVersion());
- }
-
- @Test
- void testWithSnapshotAndSameVersion() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "1.0.0-SNAPSHOT"),
- new Bundle("group", "artifact", "1.0.0")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("1.0.0", latestBundle.getVersion());
- }
-
- @Test
- void testWithSnapshotAndDifferentVersion() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "1.0.1-SNAPSHOT"),
- new Bundle("group", "artifact", "1.0.0")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("1.0.1-SNAPSHOT", latestBundle.getVersion());
- }
-
- @Test
- void testEmptyList() {
- final List<Bundle> bundles = List.of();
- assertNull(VersionedFlowUtils.getLatest(bundles));
- }
-
- @Test
- void testNonNumericVersionPart() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "4.0.0"),
- new Bundle("group", "artifact", "4.0.next")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("4.0.0", latestBundle.getVersion());
- }
-
- @Test
- void testFullyNonNumericVersionVsNumeric() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "1.0.0"),
- new Bundle("group", "artifact", "undefined")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("1.0.0", latestBundle.getVersion());
- }
-
- @Test
- void testTwoFullyNonNumericVersions() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "undefined"),
- new Bundle("group", "artifact", "unknown")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("unknown", latestBundle.getVersion());
- }
-
- @Test
- void testQualifierOrdering() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "2.0.0-SNAPSHOT"),
- new Bundle("group", "artifact", "2.0.0-M1"),
- new Bundle("group", "artifact", "2.0.0-M4"),
- new Bundle("group", "artifact", "2.0.0-RC1"),
- new Bundle("group", "artifact", "2.0.0-RC2"),
- new Bundle("group", "artifact", "2.0.0")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("2.0.0", latestBundle.getVersion());
- }
-
- @Test
- void testQualifierOrderingWithoutRelease() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "2.0.0-SNAPSHOT"),
- new Bundle("group", "artifact", "2.0.0-M1"),
- new Bundle("group", "artifact", "2.0.0-M4"),
- new Bundle("group", "artifact", "2.0.0-RC1"),
- new Bundle("group", "artifact", "2.0.0-RC2")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("2.0.0-RC2", latestBundle.getVersion());
- }
-
- @Test
- void testMilestoneVersionsOrdering() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "2.0.0-M1"),
- new Bundle("group", "artifact", "2.0.0-M4"),
- new Bundle("group", "artifact", "2.0.0-SNAPSHOT")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("2.0.0-M4", latestBundle.getVersion());
- }
-
- @Test
- void testCalendarDateFormat() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "2025.12.31"),
- new Bundle("group", "artifact", "2026.01.01")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("2026.01.01", latestBundle.getVersion());
- }
-
- @Test
- void testCalendarDateFormatWithBuildNumber() {
- final List<Bundle> bundles = List.of(
- new Bundle("group", "artifact", "2025.12.31.999"),
- new Bundle("group", "artifact", "2026.01.01.451")
- );
-
- final Bundle latestBundle = VersionedFlowUtils.getLatest(bundles);
- assertNotNull(latestBundle);
- assertEquals("2026.01.01.451", latestBundle.getVersion());
- }
- }
-
@Nested
class UpdateToLatestBundles {
private static final String PROCESSOR_TYPE =
"org.apache.nifi.processors.TestProcessor";
private static final String SERVICE_TYPE =
"org.apache.nifi.services.TestService";
@Test
- void testLookupReturnsOneOlderBundle() {
+ void testLookupReturnsNewerBundle() {
final VersionedProcessGroup group = createProcessGroup();
final VersionedProcessor processor =
VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, new Bundle("group",
"artifact", "2.0.0"), "Test Processor", new Position(0, 0));
final ComponentBundleLookup lookup =
mock(ComponentBundleLookup.class);
-
when(lookup.getAvailableBundles(PROCESSOR_TYPE)).thenReturn(List.of(new
Bundle("group", "artifact", "1.0.0")));
+
when(lookup.getLatestBundle(PROCESSOR_TYPE)).thenReturn(Optional.of(new
Bundle("group", "artifact", "3.0.0")));
VersionedFlowUtils.updateToLatestBundles(group, lookup);
- assertEquals("2.0.0", processor.getBundle().getVersion());
+ assertEquals("3.0.0", processor.getBundle().getVersion());
}
@Test
- void testLookupReturnsOneNewerBundle() {
+ void testLookupReturnsLatestBundle() {
final VersionedProcessGroup group = createProcessGroup();
final VersionedProcessor processor =
VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, new Bundle("group",
"artifact", "2.0.0"), "Test Processor", new Position(0, 0));
final ComponentBundleLookup lookup =
mock(ComponentBundleLookup.class);
-
when(lookup.getAvailableBundles(PROCESSOR_TYPE)).thenReturn(List.of(new
Bundle("group", "artifact", "3.0.0")));
+
when(lookup.getLatestBundle(PROCESSOR_TYPE)).thenReturn(Optional.of(new
Bundle("group", "artifact", "4.0.0")));
VersionedFlowUtils.updateToLatestBundles(group, lookup);
- assertEquals("3.0.0", processor.getBundle().getVersion());
+ assertEquals("4.0.0", processor.getBundle().getVersion());
}
@Test
- void testLookupReturnsMultipleBundlesIncludingSameOlderAndNewer() {
+ void testControllerServiceUpdatedToNewerBundle() {
final VersionedProcessGroup group = createProcessGroup();
- final VersionedProcessor processor =
VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, new Bundle("group",
"artifact", "2.0.0"), "Test Processor", new Position(0, 0));
+ final VersionedControllerService service =
VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, new
Bundle("group", "artifact", "1.5.0"), "Test Service");
final ComponentBundleLookup lookup =
mock(ComponentBundleLookup.class);
-
when(lookup.getAvailableBundles(PROCESSOR_TYPE)).thenReturn(List.of(
- new Bundle("group", "artifact", "1.0.0"),
- new Bundle("group", "artifact", "2.0.0"),
- new Bundle("group", "artifact", "3.0.0"),
- new Bundle("group", "artifact", "4.0.0")
- ));
+
when(lookup.getLatestBundle(SERVICE_TYPE)).thenReturn(Optional.of(new
Bundle("group", "artifact", "2.5.0")));
VersionedFlowUtils.updateToLatestBundles(group, lookup);
- assertEquals("4.0.0", processor.getBundle().getVersion());
+ assertEquals("2.5.0", service.getBundle().getVersion());
}
@Test
- void testLookupReturnsNoBundles() {
- final VersionedProcessGroup group = createProcessGroup();
- final VersionedProcessor processor =
VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, new Bundle("group",
"artifact", "2.0.0"), "Test Processor", new Position(0, 0));
+ void testNestedProcessorUpdatedToNewerBundle() {
+ final VersionedProcessGroup rootGroup = createProcessGroup();
+ final VersionedProcessGroup childGroup =
createChildProcessGroup(rootGroup, "child-group-id");
+ final VersionedProcessor nestedProcessor =
VersionedFlowUtils.addProcessor(childGroup, PROCESSOR_TYPE, new Bundle("group",
"artifact", "1.0.0"), "Nested Processor", new Position(0, 0));
final ComponentBundleLookup lookup =
mock(ComponentBundleLookup.class);
-
when(lookup.getAvailableBundles(PROCESSOR_TYPE)).thenReturn(List.of());
+
when(lookup.getLatestBundle(PROCESSOR_TYPE)).thenReturn(Optional.of(new
Bundle("group", "artifact", "5.0.0")));
- VersionedFlowUtils.updateToLatestBundles(group, lookup);
+ VersionedFlowUtils.updateToLatestBundles(rootGroup, lookup);
- assertEquals("2.0.0", processor.getBundle().getVersion());
+ assertEquals("5.0.0", nestedProcessor.getBundle().getVersion());
}
@Test
- void testControllerServiceUpdatedToNewerBundle() {
+ void testEmptyOptionalDoesNotChangeBundle() {
final VersionedProcessGroup group = createProcessGroup();
- final VersionedControllerService service =
VersionedFlowUtils.addControllerService(group, SERVICE_TYPE, new
Bundle("group", "artifact", "1.5.0"), "Test Service");
+ final VersionedProcessor processor =
VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, new Bundle("group",
"artifact", "2.0.0"), "Test Processor", new Position(0, 0));
final ComponentBundleLookup lookup =
mock(ComponentBundleLookup.class);
-
when(lookup.getAvailableBundles(SERVICE_TYPE)).thenReturn(List.of(new
Bundle("group", "artifact", "2.5.0")));
+
when(lookup.getLatestBundle(PROCESSOR_TYPE)).thenReturn(Optional.empty());
VersionedFlowUtils.updateToLatestBundles(group, lookup);
- assertEquals("2.5.0", service.getBundle().getVersion());
+ assertEquals("2.0.0", processor.getBundle().getVersion());
}
@Test
- void testNestedProcessorUpdatedToNewerBundle() {
- final VersionedProcessGroup rootGroup = createProcessGroup();
- final VersionedProcessGroup childGroup =
createChildProcessGroup(rootGroup, "child-group-id");
- final VersionedProcessor nestedProcessor =
VersionedFlowUtils.addProcessor(childGroup, PROCESSOR_TYPE, new Bundle("group",
"artifact", "1.0.0"), "Nested Processor", new Position(0, 0));
+ void testUpdateToLatestBundleReturnsTrueWhenUpdated() {
+ final VersionedProcessGroup group = createProcessGroup();
+ final VersionedProcessor processor =
VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, new Bundle("group",
"artifact", "2.0.0"), "Test Processor", new Position(0, 0));
final ComponentBundleLookup lookup =
mock(ComponentBundleLookup.class);
-
when(lookup.getAvailableBundles(PROCESSOR_TYPE)).thenReturn(List.of(new
Bundle("group", "artifact", "5.0.0")));
+
when(lookup.getLatestBundle(PROCESSOR_TYPE)).thenReturn(Optional.of(new
Bundle("group", "artifact", "3.0.0")));
- VersionedFlowUtils.updateToLatestBundles(rootGroup, lookup);
+ final boolean updated =
VersionedFlowUtils.updateToLatestBundle(processor, lookup);
- assertEquals("5.0.0", nestedProcessor.getBundle().getVersion());
+ assertTrue(updated);
+ assertEquals("3.0.0", processor.getBundle().getVersion());
}
@Test
- void testLookupReturnsVersionsWithQualifiers() {
+ void testUpdateToLatestBundleReturnsFalseWhenNotUpdated() {
final VersionedProcessGroup group = createProcessGroup();
final VersionedProcessor processor =
VersionedFlowUtils.addProcessor(group, PROCESSOR_TYPE, new Bundle("group",
"artifact", "2.0.0"), "Test Processor", new Position(0, 0));
final ComponentBundleLookup lookup =
mock(ComponentBundleLookup.class);
-
when(lookup.getAvailableBundles(PROCESSOR_TYPE)).thenReturn(List.of(
- new Bundle("group", "artifact", "2.0.0"),
- new Bundle("group", "artifact", "2.0.0-SNAPSHOT"),
- new Bundle("group", "artifact", "2.0.0-M1"),
- new Bundle("group", "artifact", "2.0.0-RC1")
- ));
+
when(lookup.getLatestBundle(PROCESSOR_TYPE)).thenReturn(Optional.empty());
- VersionedFlowUtils.updateToLatestBundles(group, lookup);
+ final boolean updated =
VersionedFlowUtils.updateToLatestBundle(processor, lookup);
+ assertFalse(updated);
assertEquals("2.0.0", processor.getBundle().getVersion());
}
diff --git
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorInitializationContext.java
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorInitializationContext.java
index d4af4dcbb2..6390fad284 100644
---
a/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorInitializationContext.java
+++
b/nifi-connector-mock-bundle/nifi-connector-mock-server/src/main/java/org/apache/nifi/mock/connector/server/MockConnectorInitializationContext.java
@@ -17,82 +17,28 @@
package org.apache.nifi.mock.connector.server;
-import org.apache.nifi.asset.AssetManager;
-import org.apache.nifi.components.connector.ComponentBundleLookup;
+import org.apache.nifi.components.connector.BundleCompatibility;
import org.apache.nifi.components.connector.FlowUpdateException;
-import
org.apache.nifi.components.connector.FrameworkConnectorInitializationContext;
-import
org.apache.nifi.components.connector.FrameworkConnectorInitializationContextBuilder;
-import org.apache.nifi.components.connector.FrameworkFlowContext;
+import
org.apache.nifi.components.connector.StandardConnectorInitializationContext;
import org.apache.nifi.components.connector.components.FlowContext;
-import org.apache.nifi.components.connector.secrets.SecretsManager;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
-import org.apache.nifi.logging.ComponentLog;
-public class MockConnectorInitializationContext implements
FrameworkConnectorInitializationContext {
-
- private final String identifier;
- private final String name;
- private final ComponentLog componentLog;
- private final SecretsManager secretsManager;
- private final AssetManager assetManager;
- private final ComponentBundleLookup componentBundleLookup;
+public class MockConnectorInitializationContext extends
StandardConnectorInitializationContext {
private final MockExtensionMapper mockExtensionMapper;
- private MockConnectorInitializationContext(final Builder builder) {
- this.identifier = builder.identifier;
- this.name = builder.name;
- this.componentLog = builder.componentLog;
- this.secretsManager = builder.secretsManager;
- this.assetManager = builder.assetManager;
+ protected MockConnectorInitializationContext(final Builder builder) {
+ super(builder);
this.mockExtensionMapper = builder.mockExtensionMapper;
- this.componentBundleLookup = builder.componentBundleLookup;
- }
-
-
- @Override
- public String getIdentifier() {
- return identifier;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public ComponentLog getLogger() {
- return componentLog;
- }
-
- @Override
- public ComponentBundleLookup getComponentBundleLookup() {
- return componentBundleLookup;
- }
-
- @Override
- public SecretsManager getSecretsManager() {
- return secretsManager;
- }
-
- @Override
- public AssetManager getAssetManager() {
- return assetManager;
}
@Override
- public void updateFlow(final FlowContext flowContext, final
VersionedExternalFlow versionedExternalFlow) throws FlowUpdateException {
- if (!(flowContext instanceof final FrameworkFlowContext
frameworkFlowContext)) {
- throw new IllegalArgumentException("FlowContext is not an instance
provided by the framework");
- }
-
+ public void updateFlow(final FlowContext flowContext, final
VersionedExternalFlow versionedExternalFlow,
+ final BundleCompatibility bundleCompatability)
throws FlowUpdateException {
replaceMocks(versionedExternalFlow.getFlowContents());
-
- // TODO: Probably should eliminate this method and instead move
AssetManager to the FlowContext and add a method there
- // to update the flow.
- frameworkFlowContext.updateFlow(versionedExternalFlow, assetManager);
+ super.updateFlow(flowContext, versionedExternalFlow,
bundleCompatability);
}
private void replaceMocks(final VersionedProcessGroup group) {
@@ -109,69 +55,17 @@ public class MockConnectorInitializationContext implements
FrameworkConnectorIni
}
}
- private void updateParameterContext(final VersionedProcessGroup group,
final String parameterContextName) {
- group.setParameterContextName(parameterContextName);
- if (group.getProcessGroups() != null) {
- for (final VersionedProcessGroup childGroup :
group.getProcessGroups()) {
- updateParameterContext(childGroup, parameterContextName);
- }
- }
- }
-
- public static class Builder implements
FrameworkConnectorInitializationContextBuilder {
+ public static class Builder extends
StandardConnectorInitializationContext.Builder {
private final MockExtensionMapper mockExtensionMapper;
- private String identifier;
- private String name;
- private ComponentLog componentLog;
- private SecretsManager secretsManager;
- private AssetManager assetManager;
- private ComponentBundleLookup componentBundleLookup;
public Builder(final MockExtensionMapper mockExtensionMapper) {
this.mockExtensionMapper = mockExtensionMapper;
}
- @Override
- public Builder identifier(final String identifier) {
- this.identifier = identifier;
- return this;
- }
-
- @Override
- public Builder name(final String name) {
- this.name = name;
- return this;
- }
-
- @Override
- public Builder componentLog(final ComponentLog componentLog) {
- this.componentLog = componentLog;
- return this;
- }
-
- @Override
- public Builder secretsManager(final SecretsManager secretsManager) {
- this.secretsManager = secretsManager;
- return this;
- }
-
- @Override
- public Builder componentBundleLookup(final ComponentBundleLookup
bundleLookup) {
- this.componentBundleLookup = bundleLookup;
- return this;
- }
-
- @Override
- public Builder assetManager(final AssetManager assetManager) {
- this.assetManager = assetManager;
- return this;
- }
-
@Override
public MockConnectorInitializationContext build() {
return new MockConnectorInitializationContext(this);
}
-
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardComponentBundleLookup.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardComponentBundleLookup.java
index 84d664d80b..cc91698827 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardComponentBundleLookup.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardComponentBundleLookup.java
@@ -22,6 +22,7 @@ import org.apache.nifi.flow.Bundle;
import org.apache.nifi.nar.ExtensionManager;
import java.util.List;
+import java.util.Optional;
public class StandardComponentBundleLookup implements ComponentBundleLookup {
private final ExtensionManager extensionManager;
@@ -38,6 +39,132 @@ public class StandardComponentBundleLookup implements
ComponentBundleLookup {
.toList();
}
+ @Override
+ public Optional<Bundle> getLatestBundle(final String componentType) {
+ final List<Bundle> availableBundles =
getAvailableBundles(componentType);
+ if (availableBundles.isEmpty()) {
+ return Optional.empty();
+ }
+
+ Bundle newest = null;
+ for (final Bundle bundle : availableBundles) {
+ if (newest == null || compareVersion(bundle.getVersion(),
newest.getVersion()) > 0) {
+ newest = bundle;
+ }
+ }
+
+ return Optional.ofNullable(newest);
+ }
+
+ private int compareVersion(final String v1, final String v2) {
+ final String baseVersion1 = getBaseVersion(v1);
+ final String baseVersion2 = getBaseVersion(v2);
+
+ final String[] parts1 = baseVersion1.split("\\.");
+ final String[] parts2 = baseVersion2.split("\\.");
+
+ final int length = Math.max(parts1.length, parts2.length);
+ for (int i = 0; i < length; i++) {
+ final String part1Str = i < parts1.length ? parts1[i] : "0";
+ final String part2Str = i < parts2.length ? parts2[i] : "0";
+
+ final int comparison = compareVersionPart(part1Str, part2Str);
+ if (comparison != 0) {
+ return comparison;
+ }
+ }
+
+ // Base versions are equal; compare qualifiers
+ final String qualifier1 = getQualifier(v1);
+ final String qualifier2 = getQualifier(v2);
+ return compareQualifiers(qualifier1, qualifier2);
+ }
+
+ private int compareQualifiers(final String qualifier1, final String
qualifier2) {
+ final int rank1 = getQualifierRank(qualifier1);
+ final int rank2 = getQualifierRank(qualifier2);
+
+ if (rank1 != rank2) {
+ return Integer.compare(rank1, rank2);
+ }
+
+ // Same qualifier type; compare numeric suffixes (e.g., RC2 > RC1, M4
> M3)
+ final int num1 = getQualifierNumber(qualifier1);
+ final int num2 = getQualifierNumber(qualifier2);
+ return Integer.compare(num1, num2);
+ }
+
+ private int getQualifierRank(final String qualifier) {
+ if (qualifier == null || qualifier.isEmpty()) {
+ return 4;
+ } else if (qualifier.startsWith("RC")) {
+ return 3;
+ } else if (qualifier.startsWith("M")) {
+ return 2;
+ } else if (qualifier.equals("SNAPSHOT")) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+
+ private int getQualifierNumber(final String qualifier) {
+ if (qualifier == null || qualifier.isEmpty()) {
+ return 0;
+ }
+
+ final StringBuilder digits = new StringBuilder();
+ for (int i = 0; i < qualifier.length(); i++) {
+ final char c = qualifier.charAt(i);
+ if (Character.isDigit(c)) {
+ digits.append(c);
+ }
+ }
+
+ if (digits.isEmpty()) {
+ return 0;
+ }
+
+ try {
+ return Integer.parseInt(digits.toString());
+ } catch (final NumberFormatException e) {
+ return 0;
+ }
+ }
+
+ private String getQualifier(final String version) {
+ final int qualifierIndex = version.indexOf('-');
+ return qualifierIndex > 0 ? version.substring(qualifierIndex + 1) :
null;
+ }
+
+ private int compareVersionPart(final String part1, final String part2) {
+ final Integer num1 = parseVersionPart(part1);
+ final Integer num2 = parseVersionPart(part2);
+
+ if (num1 != null && num2 != null) {
+ return Integer.compare(num1, num2);
+ } else if (num1 != null) {
+ return 1;
+ } else if (num2 != null) {
+ return -1;
+ } else {
+ return part1.compareTo(part2);
+ }
+ }
+
+ private Integer parseVersionPart(final String part) {
+ try {
+ return Integer.parseInt(part);
+ } catch (final NumberFormatException e) {
+ return null;
+ }
+ }
+
+ private String getBaseVersion(final String version) {
+ final int qualifierIndex = version.indexOf('-');
+ return qualifierIndex > 0 ? version.substring(0, qualifierIndex) :
version;
+ }
+
private Bundle convertBundle(final org.apache.nifi.bundle.Bundle bundle) {
final BundleCoordinate coordinate =
bundle.getBundleDetails().getCoordinate();
return new Bundle(coordinate.getGroup(), coordinate.getId(),
coordinate.getVersion());
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java
index ab2dcbf399..002f2a8aa2 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java
@@ -20,10 +20,16 @@ package org.apache.nifi.components.connector;
import org.apache.nifi.asset.AssetManager;
import org.apache.nifi.components.connector.components.FlowContext;
import org.apache.nifi.components.connector.secrets.SecretsManager;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.logging.ComponentLog;
+import java.util.List;
+import java.util.function.Consumer;
+
public class StandardConnectorInitializationContext implements
FrameworkConnectorInitializationContext {
private final String identifier;
private final String name;
@@ -33,7 +39,7 @@ public class StandardConnectorInitializationContext
implements FrameworkConnecto
private final ComponentBundleLookup componentBundleLookup;
- private StandardConnectorInitializationContext(final Builder builder) {
+ protected StandardConnectorInitializationContext(final Builder builder) {
this.identifier = builder.identifier;
this.name = builder.name;
this.componentLog = builder.componentLog;
@@ -73,14 +79,71 @@ public class StandardConnectorInitializationContext
implements FrameworkConnecto
}
@Override
- public void updateFlow(final FlowContext flowContext, final
VersionedExternalFlow versionedExternalFlow) throws FlowUpdateException {
+ public void updateFlow(final FlowContext flowContext, final
VersionedExternalFlow versionedExternalFlow,
+ final BundleCompatibility bundleCompatability)
throws FlowUpdateException {
if (!(flowContext instanceof final FrameworkFlowContext
frameworkFlowContext)) {
throw new IllegalArgumentException("FlowContext is not an instance
provided by the framework");
}
+ resolveBundles(versionedExternalFlow.getFlowContents(),
bundleCompatability);
frameworkFlowContext.updateFlow(versionedExternalFlow, assetManager);
}
+ protected void resolveBundles(final VersionedProcessGroup group, final
BundleCompatibility bundleCompatability) {
+ if (bundleCompatability == BundleCompatibility.REQUIRE_EXACT_BUNDLE) {
+ return;
+ }
+
+ if (group.getProcessors() != null) {
+ for (final VersionedProcessor processor : group.getProcessors()) {
+ resolveBundle(processor.getType(), processor.getBundle(),
bundleCompatability, processor::setBundle);
+ }
+ }
+
+ if (group.getControllerServices() != null) {
+ for (final VersionedControllerService service :
group.getControllerServices()) {
+ resolveBundle(service.getType(), service.getBundle(),
bundleCompatability, service::setBundle);
+ }
+ }
+
+ if (group.getProcessGroups() != null) {
+ for (final VersionedProcessGroup childGroup :
group.getProcessGroups()) {
+ resolveBundles(childGroup, bundleCompatability);
+ }
+ }
+ }
+
+ private void resolveBundle(final String componentType, final Bundle
currentBundle,
+ final BundleCompatibility bundleCompatability,
final Consumer<Bundle> bundleSetter) {
+ final List<Bundle> availableBundles =
componentBundleLookup.getAvailableBundles(componentType);
+
+ if (availableBundles.contains(currentBundle)) {
+ return;
+ }
+
+ if (availableBundles.isEmpty()) {
+ return;
+ }
+
+ switch (bundleCompatability) {
+ case RESOLVE_BUNDLE:
+ if (availableBundles.size() == 1) {
+ final Bundle resolvedBundle = availableBundles.getFirst();
+ componentLog.debug("Resolved bundle for {} from {} to {}",
componentType, currentBundle, resolvedBundle);
+ bundleSetter.accept(resolvedBundle);
+ }
+ break;
+ case RESOLVE_NEWEST_BUNDLE:
+
componentBundleLookup.getLatestBundle(componentType).ifPresent(latestBundle -> {
+ componentLog.debug("Resolved bundle for {} from {} to {}",
componentType, currentBundle, latestBundle);
+ bundleSetter.accept(latestBundle);
+ });
+ break;
+ default:
+ break;
+ }
+ }
+
private void updateParameterContext(final VersionedProcessGroup group,
final String parameterContextName) {
group.setParameterContextName(parameterContextName);
if (group.getProcessGroups() != null) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index a2501c883b..ae5c73f4cb 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -269,7 +269,6 @@ public class StandardConnectorNode implements ConnectorNode
{
getComponentLog().info("Working Context has been applied to Active
Context");
// The update has been completed. Tear down and recreate the
working flow context to ensure it is in a clean state.
-
resetValidationState();
recreateWorkingFlowContext();
} catch (final Throwable t) {
@@ -666,7 +665,7 @@ public class StandardConnectorNode implements ConnectorNode
{
// Update all RUNNING components to ENABLED before applying the
initial flow so that components
// are not started before being configured.
stopComponents(initialFlow.getFlowContents());
- initializationContext.updateFlow(activeFlowContext, initialFlow);
+ initializationContext.updateFlow(activeFlowContext, initialFlow,
BundleCompatibility.RESOLVE_BUNDLE);
}
resetValidationState();
@@ -1156,6 +1155,7 @@ public class StandardConnectorNode implements
ConnectorNode {
private void resetValidationState() {
validationState.set(new ValidationState(ValidationStatus.VALIDATING,
Collections.emptyList()));
validationTrigger.triggerAsync(this);
+ logger.debug("Validation state has been reset for {}", this);
}
@Override
@@ -1216,6 +1216,16 @@ public class StandardConnectorNode implements
ConnectorNode {
}
validationState.set(resultState);
+
+ if (resultState.getStatus() == ValidationStatus.VALID) {
+ logger.info("Validation completed for {}. Connector is
valid.", this);
+ } else {
+ logger.info("Validation completed for {}. Connector is
invalid: {}", this,
+ resultState.getValidationErrors().stream()
+ .map(ValidationResult::getExplanation)
+ .collect(Collectors.joining("; ")));
+ }
+
return resultState;
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardComponentBundleLookup.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardComponentBundleLookup.java
new file mode 100644
index 0000000000..c0916d4335
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardComponentBundleLookup.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.nar.ExtensionManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStandardComponentBundleLookup {
+ private static final String COMPONENT_TYPE =
"org.apache.nifi.processors.TestProcessor";
+
+ private ExtensionManager extensionManager;
+ private StandardComponentBundleLookup lookup;
+
+ @BeforeEach
+ void setup() {
+ extensionManager = mock(ExtensionManager.class);
+ lookup = new StandardComponentBundleLookup(extensionManager);
+ }
+
+ @Nested
+ class GetLatestBundle {
+ @Test
+ void testMinorVersion() {
+ setupBundles(
+ createBundle("group", "artifact", "1.0.0"),
+ createBundle("group", "artifact", "1.2.0")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("1.2.0", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testMajorVersion() {
+ setupBundles(
+ createBundle("group", "artifact", "1.0.0"),
+ createBundle("group", "artifact", "2.0.0")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("2.0.0", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testMoreDigitsLater() {
+ setupBundles(
+ createBundle("group", "artifact", "1.0"),
+ createBundle("group", "artifact", "1.0.1")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("1.0.1", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testMoreDigitsFirst() {
+ setupBundles(
+ createBundle("group", "artifact", "1.0.1"),
+ createBundle("group", "artifact", "1.0")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("1.0.1", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testWithSnapshotAndSameVersion() {
+ setupBundles(
+ createBundle("group", "artifact", "1.0.0-SNAPSHOT"),
+ createBundle("group", "artifact", "1.0.0")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("1.0.0", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testWithSnapshotAndDifferentVersion() {
+ setupBundles(
+ createBundle("group", "artifact", "1.0.1-SNAPSHOT"),
+ createBundle("group", "artifact", "1.0.0")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("1.0.1-SNAPSHOT", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testEmptyList() {
+ setupBundles();
+ assertTrue(lookup.getLatestBundle(COMPONENT_TYPE).isEmpty());
+ }
+
+ @Test
+ void testNonNumericVersionPart() {
+ setupBundles(
+ createBundle("group", "artifact", "4.0.0"),
+ createBundle("group", "artifact", "4.0.next")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("4.0.0", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testFullyNonNumericVersionVsNumeric() {
+ setupBundles(
+ createBundle("group", "artifact", "1.0.0"),
+ createBundle("group", "artifact", "undefined")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("1.0.0", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testTwoFullyNonNumericVersions() {
+ setupBundles(
+ createBundle("group", "artifact", "undefined"),
+ createBundle("group", "artifact", "unknown")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("unknown", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testQualifierOrdering() {
+ setupBundles(
+ createBundle("group", "artifact", "2.0.0-SNAPSHOT"),
+ createBundle("group", "artifact", "2.0.0-M1"),
+ createBundle("group", "artifact", "2.0.0-M4"),
+ createBundle("group", "artifact", "2.0.0-RC1"),
+ createBundle("group", "artifact", "2.0.0-RC2"),
+ createBundle("group", "artifact", "2.0.0")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("2.0.0", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testQualifierOrderingWithoutRelease() {
+ setupBundles(
+ createBundle("group", "artifact", "2.0.0-SNAPSHOT"),
+ createBundle("group", "artifact", "2.0.0-M1"),
+ createBundle("group", "artifact", "2.0.0-M4"),
+ createBundle("group", "artifact", "2.0.0-RC1"),
+ createBundle("group", "artifact", "2.0.0-RC2")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("2.0.0-RC2", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testMilestoneVersionsOrdering() {
+ setupBundles(
+ createBundle("group", "artifact", "2.0.0-M1"),
+ createBundle("group", "artifact", "2.0.0-M4"),
+ createBundle("group", "artifact", "2.0.0-SNAPSHOT")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("2.0.0-M4", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testCalendarDateFormat() {
+ setupBundles(
+ createBundle("group", "artifact", "2025.12.31"),
+ createBundle("group", "artifact", "2026.01.01")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("2026.01.01", latestBundle.get().getVersion());
+ }
+
+ @Test
+ void testCalendarDateFormatWithBuildNumber() {
+ setupBundles(
+ createBundle("group", "artifact", "2025.12.31.999"),
+ createBundle("group", "artifact", "2026.01.01.451")
+ );
+
+ final Optional<Bundle> latestBundle =
lookup.getLatestBundle(COMPONENT_TYPE);
+ assertTrue(latestBundle.isPresent());
+ assertEquals("2026.01.01.451", latestBundle.get().getVersion());
+ }
+
+ private void setupBundles(final org.apache.nifi.bundle.Bundle...
bundles) {
+
when(extensionManager.getBundles(COMPONENT_TYPE)).thenReturn(List.of(bundles));
+ }
+ }
+
+ private org.apache.nifi.bundle.Bundle createBundle(final String group,
final String artifact, final String version) {
+ final BundleCoordinate coordinate = new BundleCoordinate(group,
artifact, version);
+ final BundleDetails details = new BundleDetails.Builder()
+ .coordinate(coordinate)
+ .workingDir(new File("."))
+ .build();
+ return new org.apache.nifi.bundle.Bundle(details,
this.getClass().getClassLoader());
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java
index ae15efa509..565ddb05f9 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java
@@ -16,9 +16,18 @@
*/
package org.apache.nifi.components.connector;
+import org.apache.nifi.asset.AssetManager;
import org.apache.nifi.components.connector.components.ParameterValue;
+import org.apache.nifi.components.connector.secrets.SecretsManager;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.logging.ComponentLog;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -27,11 +36,14 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestStandardConnectorInitializationContext {
@@ -141,4 +153,235 @@ public class TestStandardConnectorInitializationContext {
final ConnectorParameterLookup parameterLookup = new
ConnectorParameterLookup(contexts, null);
return parameterLookup.getParameterValues();
}
+
+ @Test
+ public void testResolveBundlesRequireExactBundle() {
+ final Bundle unavailableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "0.0.0-NONEXISTENT");
+ final Bundle availableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "2.0.0");
+
+ final ComponentBundleLookup bundleLookup =
mock(ComponentBundleLookup.class);
+
when(bundleLookup.getAvailableBundles("org.apache.nifi.processors.standard.GenerateFlowFile")).thenReturn(List.of(availableBundle));
+
+ final VersionedProcessGroup group =
createProcessGroupWithProcessor("GenerateFlowFile",
+ "org.apache.nifi.processors.standard.GenerateFlowFile",
unavailableBundle);
+
+ final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
+ externalFlow.setFlowContents(group);
+ externalFlow.setParameterContexts(Map.of());
+
+ final StandardConnectorInitializationContext context =
createContext(bundleLookup);
+ context.resolveBundles(group,
BundleCompatibility.REQUIRE_EXACT_BUNDLE);
+
+ final VersionedProcessor processor =
group.getProcessors().iterator().next();
+ assertEquals("0.0.0-NONEXISTENT", processor.getBundle().getVersion(),
"REQUIRE_EXACT_BUNDLE should not change the bundle");
+ }
+
+ @Test
+ public void testResolveBundlesResolveBundleWithSingleAvailable() {
+ final Bundle unavailableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "0.0.0-NONEXISTENT");
+ final Bundle availableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "2.0.0");
+
+ final ComponentBundleLookup bundleLookup =
mock(ComponentBundleLookup.class);
+
when(bundleLookup.getAvailableBundles("org.apache.nifi.processors.standard.GenerateFlowFile")).thenReturn(List.of(availableBundle));
+
+ final VersionedProcessGroup group =
createProcessGroupWithProcessor("GenerateFlowFile",
"org.apache.nifi.processors.standard.GenerateFlowFile", unavailableBundle);
+
+ final StandardConnectorInitializationContext context =
createContext(bundleLookup);
+ context.resolveBundles(group, BundleCompatibility.RESOLVE_BUNDLE);
+
+ final VersionedProcessor processor =
group.getProcessors().iterator().next();
+ assertEquals("2.0.0", processor.getBundle().getVersion(),
"RESOLVE_BUNDLE should resolve to the single available bundle");
+ }
+
+ @Test
+ public void testResolveBundlesResolveBundleWithMultipleAvailable() {
+ final Bundle unavailableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "0.0.0-NONEXISTENT");
+ final Bundle availableBundle1 = createBundle("org.apache.nifi",
"nifi-standard-nar", "1.0.0");
+ final Bundle availableBundle2 = createBundle("org.apache.nifi",
"nifi-standard-nar", "2.0.0");
+
+ final ComponentBundleLookup bundleLookup =
mock(ComponentBundleLookup.class);
+
when(bundleLookup.getAvailableBundles("org.apache.nifi.processors.standard.GenerateFlowFile")).thenReturn(List.of(availableBundle1,
availableBundle2));
+
+ final VersionedProcessGroup group =
createProcessGroupWithProcessor("GenerateFlowFile",
"org.apache.nifi.processors.standard.GenerateFlowFile", unavailableBundle);
+
+ final StandardConnectorInitializationContext context =
createContext(bundleLookup);
+ context.resolveBundles(group, BundleCompatibility.RESOLVE_BUNDLE);
+
+ final VersionedProcessor processor =
group.getProcessors().iterator().next();
+ assertEquals("0.0.0-NONEXISTENT", processor.getBundle().getVersion(),
"RESOLVE_BUNDLE should not change bundle when multiple are available");
+ }
+
+ @Test
+ public void testResolveBundlesResolveBundleWithNoneAvailable() {
+ final Bundle unavailableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "0.0.0-NONEXISTENT");
+
+ final ComponentBundleLookup bundleLookup =
mock(ComponentBundleLookup.class);
+
when(bundleLookup.getAvailableBundles("org.apache.nifi.processors.standard.GenerateFlowFile")).thenReturn(List.of());
+
+ final VersionedProcessGroup group =
createProcessGroupWithProcessor("GenerateFlowFile",
"org.apache.nifi.processors.standard.GenerateFlowFile", unavailableBundle);
+
+ final StandardConnectorInitializationContext context =
createContext(bundleLookup);
+ context.resolveBundles(group, BundleCompatibility.RESOLVE_BUNDLE);
+
+ final VersionedProcessor processor =
group.getProcessors().iterator().next();
+ assertEquals("0.0.0-NONEXISTENT", processor.getBundle().getVersion(),
"RESOLVE_BUNDLE should not change bundle when none are available");
+ }
+
+ @Test
+ public void testResolveBundlesResolveNewestBundle() {
+ final Bundle unavailableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "0.0.0-NONEXISTENT");
+ final Bundle newestBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "2.0.0");
+
+ final ComponentBundleLookup bundleLookup =
mock(ComponentBundleLookup.class);
+
when(bundleLookup.getAvailableBundles("org.apache.nifi.processors.standard.GenerateFlowFile")).thenReturn(List.of(createBundle("org.apache.nifi",
"nifi-standard-nar", "1.0.0"), newestBundle));
+
when(bundleLookup.getLatestBundle("org.apache.nifi.processors.standard.GenerateFlowFile")).thenReturn(Optional.of(newestBundle));
+
+ final VersionedProcessGroup group =
createProcessGroupWithProcessor("GenerateFlowFile",
"org.apache.nifi.processors.standard.GenerateFlowFile", unavailableBundle);
+
+ final StandardConnectorInitializationContext context =
createContext(bundleLookup);
+ context.resolveBundles(group,
BundleCompatibility.RESOLVE_NEWEST_BUNDLE);
+
+ final VersionedProcessor processor =
group.getProcessors().iterator().next();
+ assertEquals("2.0.0", processor.getBundle().getVersion(),
"RESOLVE_NEWEST_BUNDLE should resolve to the newest available bundle");
+ }
+
+ @Test
+ public void testResolveBundlesResolveNewestBundleWithNoneAvailable() {
+ final Bundle unavailableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "0.0.0-NONEXISTENT");
+
+ final ComponentBundleLookup bundleLookup =
mock(ComponentBundleLookup.class);
+
when(bundleLookup.getAvailableBundles("org.apache.nifi.processors.standard.GenerateFlowFile"))
+ .thenReturn(List.of());
+
+ final VersionedProcessGroup group =
createProcessGroupWithProcessor("GenerateFlowFile",
+ "org.apache.nifi.processors.standard.GenerateFlowFile",
unavailableBundle);
+
+ final StandardConnectorInitializationContext context =
createContext(bundleLookup);
+ context.resolveBundles(group,
BundleCompatibility.RESOLVE_NEWEST_BUNDLE);
+
+ final VersionedProcessor processor =
group.getProcessors().iterator().next();
+ assertEquals("0.0.0-NONEXISTENT", processor.getBundle().getVersion(),
"RESOLVE_NEWEST_BUNDLE should not change bundle when none are available");
+ }
+
+ @Test
+ public void testResolveBundlesWithControllerService() {
+ final Bundle unavailableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "0.0.0-NONEXISTENT");
+ final Bundle availableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "2.0.0");
+
+ final ComponentBundleLookup bundleLookup =
mock(ComponentBundleLookup.class);
+
when(bundleLookup.getAvailableBundles("org.apache.nifi.ssl.StandardSSLContextService"))
+ .thenReturn(List.of(availableBundle));
+
+ final VersionedProcessGroup group =
createProcessGroupWithControllerService("SSLContextService",
+ "org.apache.nifi.ssl.StandardSSLContextService",
unavailableBundle);
+
+ final StandardConnectorInitializationContext context =
createContext(bundleLookup);
+ context.resolveBundles(group, BundleCompatibility.RESOLVE_BUNDLE);
+
+ final VersionedControllerService service =
group.getControllerServices().iterator().next();
+ assertEquals("2.0.0", service.getBundle().getVersion(),
"RESOLVE_BUNDLE should resolve controller service bundle");
+ }
+
+ @Test
+ public void testResolveBundlesWithNestedProcessGroups() {
+ final Bundle unavailableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "0.0.0-NONEXISTENT");
+ final Bundle availableBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "2.0.0");
+
+ final ComponentBundleLookup bundleLookup =
mock(ComponentBundleLookup.class);
+
when(bundleLookup.getAvailableBundles("org.apache.nifi.processors.standard.GenerateFlowFile"))
+ .thenReturn(List.of(availableBundle));
+
+ final VersionedProcessGroup childGroup =
createProcessGroupWithProcessor("GenerateFlowFile",
+ "org.apache.nifi.processors.standard.GenerateFlowFile",
unavailableBundle);
+ childGroup.setIdentifier("child-group");
+
+ final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
+ rootGroup.setIdentifier("root-group");
+ rootGroup.setName("Root");
+ rootGroup.setProcessors(new HashSet<>());
+ rootGroup.setControllerServices(new HashSet<>());
+ rootGroup.setProcessGroups(Set.of(childGroup));
+
+ final StandardConnectorInitializationContext context =
createContext(bundleLookup);
+ context.resolveBundles(rootGroup, BundleCompatibility.RESOLVE_BUNDLE);
+
+ final VersionedProcessor processor =
childGroup.getProcessors().iterator().next();
+ assertEquals("2.0.0", processor.getBundle().getVersion(),
"RESOLVE_BUNDLE should resolve bundles in nested process groups");
+ }
+
+ @Test
+ public void testResolveBundlesDoesNotChangeAvailableBundle() {
+ final Bundle specifiedBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "1.0.0");
+ final Bundle newerBundle = createBundle("org.apache.nifi",
"nifi-standard-nar", "2.0.0");
+
+ final ComponentBundleLookup bundleLookup =
mock(ComponentBundleLookup.class);
+
when(bundleLookup.getAvailableBundles("org.apache.nifi.processors.standard.GenerateFlowFile"))
+ .thenReturn(List.of(specifiedBundle, newerBundle));
+
+ final VersionedProcessGroup group =
createProcessGroupWithProcessor("GenerateFlowFile",
+ "org.apache.nifi.processors.standard.GenerateFlowFile",
specifiedBundle);
+
+ final StandardConnectorInitializationContext context =
createContext(bundleLookup);
+ context.resolveBundles(group,
BundleCompatibility.RESOLVE_NEWEST_BUNDLE);
+
+ final VersionedProcessor processor =
group.getProcessors().iterator().next();
+ assertEquals("1.0.0", processor.getBundle().getVersion(), "Should not
change bundle when it is already available");
+ }
+
+ private Bundle createBundle(final String group, final String artifact,
final String version) {
+ final Bundle bundle = new Bundle();
+ bundle.setGroup(group);
+ bundle.setArtifact(artifact);
+ bundle.setVersion(version);
+ return bundle;
+ }
+
+ private VersionedProcessGroup createProcessGroupWithProcessor(final String
name, final String type, final Bundle bundle) {
+ final VersionedProcessor processor = new VersionedProcessor();
+ processor.setIdentifier("processor-1");
+ processor.setName(name);
+ processor.setType(type);
+ processor.setBundle(bundle);
+ processor.setComponentType(ComponentType.PROCESSOR);
+ processor.setProperties(new HashMap<>());
+ processor.setPropertyDescriptors(new HashMap<>());
+
+ final VersionedProcessGroup group = new VersionedProcessGroup();
+ group.setIdentifier("test-group");
+ group.setName("Test Group");
+ group.setProcessors(new HashSet<>(Set.of(processor)));
+ group.setControllerServices(new HashSet<>());
+ group.setProcessGroups(new HashSet<>());
+ return group;
+ }
+
+ private VersionedProcessGroup
createProcessGroupWithControllerService(final String name, final String type,
final Bundle bundle) {
+ final VersionedControllerService service = new
VersionedControllerService();
+ service.setIdentifier("service-1");
+ service.setName(name);
+ service.setType(type);
+ service.setBundle(bundle);
+ service.setComponentType(ComponentType.CONTROLLER_SERVICE);
+ service.setProperties(new HashMap<>());
+ service.setPropertyDescriptors(new HashMap<>());
+
+ final VersionedProcessGroup group = new VersionedProcessGroup();
+ group.setIdentifier("test-group");
+ group.setName("Test Group");
+ group.setProcessors(new HashSet<>());
+ group.setControllerServices(new HashSet<>(Set.of(service)));
+ group.setProcessGroups(new HashSet<>());
+ return group;
+ }
+
+ private StandardConnectorInitializationContext createContext(final
ComponentBundleLookup bundleLookup) {
+ return new StandardConnectorInitializationContext.Builder()
+ .identifier("test-connector")
+ .name("Test Connector")
+ .componentLog(mock(ComponentLog.class))
+ .secretsManager(mock(SecretsManager.class))
+ .assetManager(mock(AssetManager.class))
+ .componentBundleLookup(bundleLookup)
+ .build();
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
index 52ee7867f0..02f115b966 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java
@@ -528,7 +528,7 @@ public class ConnectorResource extends ApplicationResource {
@Parameter(
description = "If the client id is not specified, new one
will be generated. This value (whether specified or generated) is included in
the response."
)
- @QueryParam(CLIENT_ID) final ClientIdParameter clientId,
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final
ClientIdParameter clientId,
@Parameter(
description = "Acknowledges that this node is disconnected
to allow for mutable requests to proceed."
)
@@ -752,7 +752,7 @@ public class ConnectorResource extends ApplicationResource {
@Parameter(
description = "If the client id is not specified, new one
will be generated. This value (whether specified or generated) is included in
the response."
)
- @QueryParam(CLIENT_ID) final ClientIdParameter clientId,
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final
ClientIdParameter clientId,
@Parameter(
description = "Acknowledges that this node is disconnected
to allow for mutable requests to proceed."
)
@@ -1479,7 +1479,7 @@ public class ConnectorResource extends
ApplicationResource {
@Parameter(
description = "If the client id is not specified, new one
will be generated. This value (whether specified or generated) is included in
the response."
)
- @QueryParam(CLIENT_ID) final ClientIdParameter clientId,
+ @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) final
ClientIdParameter clientId,
@Parameter(
description = "Acknowledges that this node is disconnected
to allow for mutable requests to proceed."
)
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java
new file mode 100644
index 0000000000..e1f6be0cf0
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.connectors.tests.system;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.connector.AbstractConnector;
+import org.apache.nifi.components.connector.BundleCompatibility;
+import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.ConnectorPropertyDescriptor;
+import org.apache.nifi.components.connector.ConnectorPropertyGroup;
+import org.apache.nifi.components.connector.FlowUpdateException;
+import org.apache.nifi.components.connector.PropertyType;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * A test connector that exercises bundle resolution capabilities.
+ * It creates a flow with processors using different bundle specifications to
test
+ * how the framework handles unavailable bundles based on the configured
BundleCompatability strategy.
+ */
+public class BundleResolutionConnector extends AbstractConnector {
+
+ private static final String BUNDLE_COMPATABILITY_STEP = "Bundle
Resolution";
+
+ private static final ConnectorPropertyDescriptor
BUNDLE_COMPATABILITY_PROPERTY = new ConnectorPropertyDescriptor.Builder()
+ .name("Bundle Compatability")
+ .description("Specifies how bundle resolution should be handled when
the specified bundle is not available.")
+ .required(true)
+ .type(PropertyType.STRING)
+ .allowableValues(
+ BundleCompatibility.REQUIRE_EXACT_BUNDLE.name(),
+ BundleCompatibility.RESOLVE_BUNDLE.name(),
+ BundleCompatibility.RESOLVE_NEWEST_BUNDLE.name()
+ )
+ .defaultValue(BundleCompatibility.REQUIRE_EXACT_BUNDLE.name())
+ .build();
+
+ private static final ConnectorPropertyGroup BUNDLE_PROPERTY_GROUP = new
ConnectorPropertyGroup.Builder()
+ .name("Bundle Settings")
+ .description("Settings for bundle resolution behavior.")
+ .properties(List.of(BUNDLE_COMPATABILITY_PROPERTY))
+ .build();
+
+ private static final ConfigurationStep BUNDLE_STEP = new
ConfigurationStep.Builder()
+ .name(BUNDLE_COMPATABILITY_STEP)
+ .propertyGroups(List.of(BUNDLE_PROPERTY_GROUP))
+ .build();
+
+ private final List<ConfigurationStep> configurationSteps =
List.of(BUNDLE_STEP);
+
+ @Override
+ protected void onStepConfigured(final String stepName, final FlowContext
workingContext) throws FlowUpdateException {
+ if (BUNDLE_COMPATABILITY_STEP.equals(stepName)) {
+ final String compatabilityValue =
workingContext.getConfigurationContext()
+ .getProperty(BUNDLE_STEP, BUNDLE_COMPATABILITY_PROPERTY)
+ .getValue();
+ final BundleCompatibility bundleCompatability =
BundleCompatibility.valueOf(compatabilityValue);
+
+ final VersionedExternalFlow flow = createFlowWithBundleScenarios();
+ getInitializationContext().updateFlow(workingContext, flow,
bundleCompatability);
+ }
+ }
+
+ @Override
+ public VersionedExternalFlow getInitialFlow() {
+ final VersionedProcessGroup group = new VersionedProcessGroup();
+ group.setIdentifier(UUID.randomUUID().toString());
+ group.setName("Bundle Resolution Flow");
+ group.setProcessors(new HashSet<>());
+ group.setProcessGroups(new HashSet<>());
+ group.setConnections(new HashSet<>());
+ group.setControllerServices(new HashSet<>());
+
+ final VersionedParameter compatabilityParam = new VersionedParameter();
+ compatabilityParam.setName("BUNDLE_COMPATABILITY");
+
compatabilityParam.setValue(BundleCompatibility.REQUIRE_EXACT_BUNDLE.name());
+ compatabilityParam.setSensitive(false);
+ compatabilityParam.setProvided(false);
+ compatabilityParam.setReferencedAssets(List.of());
+
+ final VersionedParameterContext parameterContext = new
VersionedParameterContext();
+ parameterContext.setName("Bundle Resolution Parameter Context");
+ parameterContext.setParameters(Set.of(compatabilityParam));
+
+ final VersionedExternalFlow flow = new VersionedExternalFlow();
+ flow.setParameterContexts(Map.of(parameterContext.getName(),
parameterContext));
+ flow.setFlowContents(group);
+ return flow;
+ }
+
+ private VersionedExternalFlow createFlowWithBundleScenarios() {
+ final VersionedProcessGroup group = new VersionedProcessGroup();
+ group.setIdentifier(UUID.randomUUID().toString());
+ group.setName("Bundle Resolution Flow");
+ group.setProcessors(new HashSet<>());
+ group.setProcessGroups(new HashSet<>());
+ group.setConnections(new HashSet<>());
+ group.setControllerServices(new HashSet<>());
+
+ // Add a processor with an unavailable bundle (fake version) that
should be resolved based on BundleCompatability
+ // Uses the system test GenerateFlowFile processor which is available
in the system test extensions bundle
+ final VersionedProcessor testProcessor = createProcessor(
+ "test-processor",
+ "GenerateFlowFile for Bundle Resolution Test",
+ "org.apache.nifi.processors.tests.system.GenerateFlowFile",
+ "org.apache.nifi",
+ "nifi-system-test-extensions-nar",
+ "0.0.0-NONEXISTENT",
+ new Position(100, 100)
+ );
+ group.getProcessors().add(testProcessor);
+
+ final VersionedParameterContext parameterContext = new
VersionedParameterContext();
+ parameterContext.setName("Bundle Resolution Parameter Context");
+ parameterContext.setParameters(Set.of());
+
+ final VersionedExternalFlow flow = new VersionedExternalFlow();
+ flow.setParameterContexts(Map.of(parameterContext.getName(),
parameterContext));
+ flow.setFlowContents(group);
+ return flow;
+ }
+
+ private VersionedProcessor createProcessor(final String id, final String
name, final String type,
+ final String bundleGroup, final
String bundleArtifact,
+ final String bundleVersion,
final Position position) {
+ final VersionedProcessor processor = new VersionedProcessor();
+ processor.setIdentifier(id);
+ processor.setName(name);
+ processor.setType(type);
+ processor.setPosition(position);
+
+ final Bundle bundle = new Bundle();
+ bundle.setGroup(bundleGroup);
+ bundle.setArtifact(bundleArtifact);
+ bundle.setVersion(bundleVersion);
+ processor.setBundle(bundle);
+
+ processor.setProperties(new HashMap<>());
+ processor.setPropertyDescriptors(new HashMap<>());
+ processor.setStyle(new HashMap<>());
+ processor.setSchedulingPeriod("1 sec");
+ processor.setSchedulingStrategy("TIMER_DRIVEN");
+ processor.setExecutionNode("ALL");
+ processor.setPenaltyDuration("30 sec");
+ processor.setYieldDuration("1 sec");
+ processor.setBulletinLevel("WARN");
+ processor.setRunDurationMillis(0L);
+ processor.setConcurrentlySchedulableTaskCount(1);
+ processor.setAutoTerminatedRelationships(Set.of("success"));
+ processor.setScheduledState(ScheduledState.ENABLED);
+ processor.setRetryCount(10);
+ processor.setRetriedRelationships(new HashSet<>());
+ processor.setBackoffMechanism("PENALIZE_FLOWFILE");
+ processor.setMaxBackoffPeriod("10 mins");
+ processor.setComponentType(ComponentType.PROCESSOR);
+
+ return processor;
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verifyConfigurationStep(final String
stepName,
+ final
Map<String, String> propertyValueOverrides,
+ final
FlowContext flowContext) {
+ return List.of(new ConfigVerificationResult.Builder()
+ .outcome(Outcome.SUCCESSFUL)
+ .subject(stepName)
+ .verificationStepName("Bundle Resolution Verification")
+ .explanation("Bundle resolution configuration verified
successfully.")
+ .build());
+ }
+
+ @Override
+ public List<ConfigurationStep> getConfigurationSteps() {
+ return configurationSteps;
+ }
+
+ @Override
+ public void applyUpdate(final FlowContext workingFlowContext, final
FlowContext activeFlowContext) throws FlowUpdateException {
+ final String compatabilityValue =
workingFlowContext.getConfigurationContext()
+ .getProperty(BUNDLE_STEP, BUNDLE_COMPATABILITY_PROPERTY)
+ .getValue();
+ final BundleCompatibility bundleCompatability =
BundleCompatibility.valueOf(compatabilityValue);
+
+ final VersionedExternalFlow flow = createFlowWithBundleScenarios();
+ getInitializationContext().updateFlow(activeFlowContext, flow,
bundleCompatability);
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
index 17537997b6..9d8b9a9508 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
@@ -19,6 +19,7 @@ package org.apache.nifi.connectors.tests.system;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.connector.AbstractConnector;
+import org.apache.nifi.components.connector.BundleCompatibility;
import org.apache.nifi.components.connector.ConfigurationStep;
import org.apache.nifi.components.connector.ConnectorPropertyDescriptor;
import org.apache.nifi.components.connector.ConnectorPropertyGroup;
@@ -82,7 +83,7 @@ public class GatedDataQueuingConnector extends
AbstractConnector {
final Bundle bundle = new Bundle();
bundle.setGroup("org.apache.nifi");
bundle.setArtifact("nifi-system-test-extensions-nar");
- bundle.setVersion("2.8.0-SNAPSHOT");
+ bundle.setVersion("2.8.0");
final VersionedProcessor generate = createVersionedProcessor("gen-1",
"1234", "GenerateFlowFile",
"org.apache.nifi.processors.tests.system.GenerateFlowFile", bundle,
@@ -157,7 +158,7 @@ public class GatedDataQueuingConnector extends
AbstractConnector {
}
}
- getInitializationContext().updateFlow(activeFlowContext, flow);
+ getInitializationContext().updateFlow(activeFlowContext, flow,
BundleCompatibility.RESOLVE_BUNDLE);
}
private VersionedProcessor createVersionedProcessor(final String
identifier, final String groupIdentifier, final String name,
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
index 3885e4c079..ab00ff7275 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
@@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.connectors.tests.system.NopConnector
org.apache.nifi.connectors.tests.system.AssetConnector
+org.apache.nifi.connectors.tests.system.BundleResolutionConnector
+org.apache.nifi.connectors.tests.system.CalculateConnector
org.apache.nifi.connectors.tests.system.DataQueuingConnector
org.apache.nifi.connectors.tests.system.GatedDataQueuingConnector
org.apache.nifi.connectors.tests.system.NestedProcessGroupConnector
-org.apache.nifi.connectors.tests.system.CalculateConnector
+org.apache.nifi.connectors.tests.system.NopConnector
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
index 1053a4018c..3dec256e24 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
@@ -19,27 +19,35 @@ package org.apache.nifi.tests.system.connectors;
import jakarta.ws.rs.NotFoundException;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.components.connector.BundleCompatibility;
import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ConnectorConfigurationDTO;
import org.apache.nifi.web.api.dto.ConnectorValueReferenceDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.entity.ConnectorEntity;
import org.apache.nifi.web.api.entity.ParameterContextsEntity;
import org.apache.nifi.web.api.entity.ParameterProviderEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -211,4 +219,62 @@ public class ConnectorCrudIT extends NiFiSystemIT {
assertInstanceOf(NotFoundException.class, e.getCause());
}
}
+
+ @Test
+ public void testBundleResolutionRequireExactBundle() throws
NiFiClientException, IOException, InterruptedException {
+ final ConnectorAndProcessor connectorAndProcessor =
createBundleResolutionConnector(BundleCompatibility.REQUIRE_EXACT_BUNDLE);
+ final ProcessorDTO processor = connectorAndProcessor.processor();
+ assertTrue(processor.getExtensionMissing());
+
+ final BundleDTO bundle = processor.getBundle();
+ assertEquals("0.0.0-NONEXISTENT", bundle.getVersion());
+ assertEquals(Boolean.TRUE, processor.getExtensionMissing());
+
+ final ConnectorEntity connector = connectorAndProcessor.connector();
+ assertEquals("INVALID",
connector.getComponent().getValidationStatus());
+ final Collection<String> validationErrors =
connector.getComponent().getValidationErrors();
+ assertNotNull(validationErrors);
+ assertEquals(1, validationErrors.size());
+ final String validationError = validationErrors.iterator().next();
+ assertTrue(validationError.contains("missing"));
+ }
+
+ @Test
+ public void testBundleResolutionResolveBundle() throws
NiFiClientException, IOException, InterruptedException {
+ final ProcessorDTO processor =
createBundleResolutionConnector(BundleCompatibility.RESOLVE_BUNDLE).processor();
+ assertFalse(processor.getExtensionMissing());
+
+ final BundleDTO bundle = processor.getBundle();
+ assertNotEquals("0.0.0-NONEXISTENT", bundle.getVersion());
+ }
+
+ @Test
+ public void testBundleResolutionResolveNewestBundle() throws
NiFiClientException, IOException, InterruptedException {
+ final ProcessorDTO processor =
createBundleResolutionConnector(BundleCompatibility.RESOLVE_NEWEST_BUNDLE).processor();
+ assertFalse(processor.getExtensionMissing());
+
+ final BundleDTO bundle = processor.getBundle();
+ assertNotEquals("0.0.0-NONEXISTENT", bundle.getVersion());
+ }
+
+ private ConnectorAndProcessor createBundleResolutionConnector(final
BundleCompatibility bundleCompatability) throws NiFiClientException,
IOException, InterruptedException {
+ final ConnectorEntity connector =
getClientUtil().createConnector("BundleResolutionConnector");
+ assertNotNull(connector);
+
+ getClientUtil().configureConnector(connector, "Bundle Resolution",
Map.of("Bundle Compatability", bundleCompatability.name()));
+ getClientUtil().applyConnectorUpdate(connector);
+
+ final ConnectorEntity updatedConnector =
getNifiClient().getConnectorClient().getConnector(connector.getId());
+ final ProcessGroupFlowEntity flowEntity =
getNifiClient().getConnectorClient().getFlow(connector.getId());
+ final FlowDTO flowDto = flowEntity.getProcessGroupFlow().getFlow();
+ final Set<ProcessorEntity> processors = flowDto.getProcessors();
+
+ assertEquals(1, processors.size());
+
+ final ProcessorDTO processor =
processors.iterator().next().getComponent();
+ return new ConnectorAndProcessor(updatedConnector, processor);
+ }
+
+ private record ConnectorAndProcessor(ConnectorEntity connector,
ProcessorDTO processor) {
+ }
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java
index eef6c4bf97..9f0321e873 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorDrainIT.java
@@ -151,7 +151,7 @@ public class ConnectorDrainIT extends NiFiSystemIT {
logger.info("Sleeping for 2 seconds to verify connector remains in
DRAINING state");
Thread.sleep(2000L);
- ConnectorEntity drainingConnector =
getNifiClient().getConnectorClient().getConnector(connectorId);
+ final ConnectorEntity drainingConnector =
getNifiClient().getConnectorClient().getConnector(connectorId);
logger.info("Connector state after 2 seconds: {}",
drainingConnector.getComponent().getState());
assertEquals(ConnectorState.DRAINING.name(),
drainingConnector.getComponent().getState());
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorVersionResolutionIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorVersionResolutionIT.java
new file mode 100644
index 0000000000..2c10e8093d
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorVersionResolutionIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import org.apache.nifi.stream.io.GZIPOutputStream;
+import org.apache.nifi.tests.system.InstanceConfiguration;
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.tests.system.SpawnedStandaloneNiFiInstanceFactory;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * System test that verifies that when a flow.json.gz contains a Connector
whose
+ * bundle version does not exist, but only one version of that Connector type
is
+ * available, the Connector is created with the correct/available version.
+ */
+class ConnectorVersionResolutionIT extends NiFiSystemIT {
+ private static final String CONNECTOR_ID =
"11111111-1111-1111-1111-111111111111";
+ private static final String NONEXISTENT_VERSION = "1.0.0-nonexistent";
+
+ @Override
+ public NiFiInstanceFactory getInstanceFactory() {
+ final Path flowJsonInputPath =
Paths.get("src/test/resources/flows/connector-version-mismatch/flow.json");
+ final Path flowJsonOutputPath =
Paths.get("target/connector-version-mismatch-flow.json.gz").toAbsolutePath();
+
+ try (final InputStream inputStream =
Files.newInputStream(flowJsonInputPath);
+ final OutputStream outputStream = new
GZIPOutputStream(Files.newOutputStream(flowJsonOutputPath))) {
+ inputStream.transferTo(outputStream);
+ } catch (final IOException e) {
+ throw new UncheckedIOException("Failed to compress Flow
Configuration [%s]".formatted(flowJsonInputPath), e);
+ }
+
+ return new SpawnedStandaloneNiFiInstanceFactory(
+ new InstanceConfiguration.Builder()
+
.bootstrapConfig("src/test/resources/conf/default/bootstrap.conf")
+ .instanceDirectory("target/standalone-instance")
+ .flowJson(flowJsonOutputPath.toFile())
+ .overrideNifiProperties(getNifiPropertiesOverrides())
+ .build()
+ );
+ }
+
+ @Override
+ protected boolean isAllowFactoryReuse() {
+ return false;
+ }
+
+ @Override
+ protected boolean isDestroyEnvironmentAfterEachTest() {
+ return true;
+ }
+
+ @Test
+ void testConnectorCreatedWithCorrectVersionWhenOnlyOneVersionExists()
throws NiFiClientException, IOException {
+ final ConnectorEntity connector =
getNifiClient().getConnectorClient().getConnector(CONNECTOR_ID);
+ assertNotNull(connector);
+ assertNotNull(connector.getComponent());
+
+ final BundleDTO bundle = connector.getComponent().getBundle();
+ assertNotNull(bundle, "Connector bundle should not be null");
+ assertEquals("org.apache.nifi", bundle.getGroup());
+ assertEquals("nifi-system-test-extensions-nar", bundle.getArtifact());
+
+ assertNotEquals(NONEXISTENT_VERSION, bundle.getVersion(), "Connector
should not have the nonexistent version from flow.json");
+ assertEquals(getNiFiVersion(), bundle.getVersion(), "Connector should
be created with the current NiFi version since only one version exists");
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/connector-version-mismatch/flow.json
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/connector-version-mismatch/flow.json
new file mode 100644
index 0000000000..d01e613000
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/flows/connector-version-mismatch/flow.json
@@ -0,0 +1,52 @@
+{
+ "encodingVersion": {
+ "majorVersion": 2,
+ "minorVersion": 0
+ },
+ "maxTimerDrivenThreadCount": 10,
+ "registries": [],
+ "parameterContexts": [],
+ "parameterProviders": [],
+ "controllerServices": [],
+ "reportingTasks": [],
+ "flowAnalysisRules": [],
+ "connectors": [
+ {
+ "instanceIdentifier": "11111111-1111-1111-1111-111111111111",
+ "name": "Test NopConnector",
+ "type": "org.apache.nifi.connectors.tests.system.NopConnector",
+ "bundle": {
+ "group": "org.apache.nifi",
+ "artifact": "nifi-system-test-extensions-nar",
+ "version": "1.0.0-nonexistent"
+ },
+ "scheduledState": "ENABLED",
+ "activeFlowConfiguration": [],
+ "workingFlowConfiguration": []
+ }
+ ],
+ "rootGroup": {
+ "identifier": "root-group-identifier",
+ "instanceIdentifier": "root-group-instance-id",
+ "name": "NiFi Flow",
+ "comments": "",
+ "position": {
+ "x": 0.0,
+ "y": 0.0
+ },
+ "processGroups": [],
+ "remoteProcessGroups": [],
+ "processors": [],
+ "inputPorts": [],
+ "outputPorts": [],
+ "connections": [],
+ "labels": [],
+ "funnels": [],
+ "controllerServices": [],
+ "defaultFlowFileExpiration": "0 sec",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE"
+ }
+}