This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f27ace1ccf NIFI-12016: This closes #7662. Allow use of compatible NAR
bundles when loading flow from cluster connection; when determining what
bundles are compatible, consider not just any bundle if it's the only one but
also any bundle whose version matches the framework version so that when NiFi
is upgraded, it is handled more gracefully.
f27ace1ccf is described below
commit f27ace1ccf36cb0f7a958462499c388652dde1c0
Author: Mark Payne <[email protected]>
AuthorDate: Wed Aug 30 17:39:31 2023 -0400
NIFI-12016: This closes #7662. Allow use of compatible NAR bundles when
loading flow from cluster connection; when determining what bundles are
compatible, consider not just any bundle if it's the only one but also any
bundle whose version matches the framework version so that when NiFi is
upgraded, it is handled more gracefully.
Signed-off-by: Joseph Witt <[email protected]>
---
.../java/org/apache/nifi/util/BundleUtils.java | 44 ++++++---
.../java/org/apache/nifi/util/TestBundleUtils.java | 100 +++++++++++++++++++
.../nifi/controller/StandardFlowService.java | 2 +-
.../system/clustering/FlowSynchronizationIT.java | 109 ---------------------
4 files changed, 133 insertions(+), 122 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
index d0c428f263..d7ad003f62 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/BundleUtils.java
@@ -20,22 +20,40 @@ import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.nar.PythonBundle;
import org.apache.nifi.web.api.dto.BundleDTO;
import java.util.List;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Utility class for Bundles.
*/
public final class BundleUtils {
- private static Optional<BundleCoordinate> findOptionalBundleForType(final
ExtensionManager extensionManager, final String type, final BundleCoordinate
desiredCoordinate) {
+ static Optional<BundleCoordinate> findOptionalBundleForType(final
ExtensionManager extensionManager, final String type, final Bundle
frameworkBundle) {
final List<Bundle> bundles = extensionManager.getBundles(type);
if (bundles.size() == 1) {
return
Optional.of(bundles.get(0).getBundleDetails().getCoordinate());
}
+
+ // All NARs that are packaged with NiFi will have the same bundle
coordinate as the NiFi framework bundle.
+ // During an upgrade, it's fairly common to have two versions of a
NAR: the version shipped with NiFi and another version, perhaps to maintain
+ // backward compatibility to because the new version behaves some
different way and the user wants the old behavior in some instances, etc.
+ // In this case, the user may have two versions. For example, version
2.2.0 and 2.4.0 while NiFi is at version 2.4.0.
+ // Now, during upgrade to 2.4.1, there will no longer be a 2.4.0
available. We want to be smart enough to realize that those extension using
version
+ // 2.2.0 stay there but those using 2.4.0 upgrade to 2.4.1.
+ // To do this, we always first match on the exact version but this
method is called when there's no exact match. So those marked 2.2.0 won't
arrive here.
+ // But for those extensions that were using 2.4.0, we want to now look
for version 2.4.1 - I.e., the one with the same version as the framework. If we
+ // find that version, then we want to use it. This helps to smooth out
the upgrade process even when users have multiple versions of a given NAR.
+ final String frameworkVersion =
frameworkBundle.getBundleDetails().getCoordinate().getVersion();
+ for (final Bundle bundle : bundles) {
+ final String componentVersion =
bundle.getBundleDetails().getCoordinate().getVersion();
+ if (frameworkVersion.equals(componentVersion)) {
+ return Optional.of(bundle.getBundleDetails().getCoordinate());
+ }
+ }
+
return Optional.empty();
}
@@ -71,7 +89,10 @@ public final class BundleUtils {
throw new IllegalStateException(String.format("%s from %s is
not known to this NiFi instance.", type, coordinate));
}
} else {
- final List<BundleCoordinate> bundlesForType =
extensionManager.getBundles(type).stream().map(b ->
b.getBundleDetails().getCoordinate()).collect(Collectors.toList());
+ final List<BundleCoordinate> bundlesForType =
extensionManager.getBundles(type).stream()
+ .map(b -> b.getBundleDetails().getCoordinate())
+ .toList();
+
if (bundlesForType.contains(coordinate)) {
return coordinate;
} else {
@@ -82,18 +103,17 @@ public final class BundleUtils {
private static Optional<BundleCoordinate>
findOptionalCompatibleBundle(final ExtensionManager extensionManager, final
String type,
- final BundleDTO
bundleDTO, final boolean allowCompatibleBundle) {
+ final BundleDTO
bundleDTO) {
final BundleCoordinate coordinate = new
BundleCoordinate(bundleDTO.getGroup(), bundleDTO.getArtifact(),
bundleDTO.getVersion());
final Bundle bundle = extensionManager.getBundle(coordinate);
if (bundle == null) {
- if (allowCompatibleBundle) {
- return findOptionalBundleForType(extensionManager, type,
coordinate);
- } else {
- return Optional.empty();
- }
+ return findOptionalBundleForType(extensionManager, type,
NarClassLoadersHolder.getInstance().getFrameworkBundle());
} else {
- final List<BundleCoordinate> bundlesForType =
extensionManager.getBundles(type).stream().map(b ->
b.getBundleDetails().getCoordinate()).collect(Collectors.toList());
+ final List<BundleCoordinate> bundlesForType =
extensionManager.getBundles(type).stream()
+ .map(b -> b.getBundleDetails().getCoordinate())
+ .toList();
+
if (bundlesForType.contains(coordinate)) {
return Optional.of(coordinate);
} else {
@@ -181,9 +201,9 @@ public final class BundleUtils {
public static Optional<BundleCoordinate> getOptionalCompatibleBundle(final
ExtensionManager extensionManager, final String type, final BundleDTO
bundleDTO) {
if (bundleDTO == null) {
- return findOptionalBundleForType(extensionManager, type, null);
+ return findOptionalBundleForType(extensionManager, type,
NarClassLoadersHolder.getInstance().getFrameworkBundle());
} else {
- return findOptionalCompatibleBundle(extensionManager, type,
bundleDTO, true);
+ return findOptionalCompatibleBundle(extensionManager, type,
bundleDTO);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestBundleUtils.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestBundleUtils.java
new file mode 100644
index 0000000000..0368ac513b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/util/TestBundleUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.nar.NarClassLoadersHolder;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class TestBundleUtils {
+
+ private static final String PROCESSOR_TYPE = "MyProcessor";
+ private static final String FRAMEWORK_VERSION = "5.0.0";
+
+ private static final Bundle frameworkBundle =
createBundle("framework-bundle", FRAMEWORK_VERSION);
+ private static ExtensionManager extensionManager;
+
+
+ @BeforeAll
+ public static void setup() throws IOException, ClassNotFoundException {
+ extensionManager = Mockito.mock(ExtensionManager.class);
+
+ final NarClassLoaders narClassLoaders =
NarClassLoadersHolder.getInstance();
+ narClassLoaders.init(null, new File("target/extensions"));
+ }
+
+ @Test
+ public void findOptionalBundleMatchingFramework() throws IOException,
ClassNotFoundException {
+ final Bundle frameworkVersionBundle = createBundle("my-bundle",
FRAMEWORK_VERSION);
+ final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
+ final List<Bundle> bundles = Arrays.asList(frameworkVersionBundle,
otherBundle);
+ when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);
+
+ final Optional<BundleCoordinate> compatibleCoordinate =
BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE,
frameworkBundle);
+ assertTrue(compatibleCoordinate.isPresent());
+
assertEquals(frameworkVersionBundle.getBundleDetails().getCoordinate(),
compatibleCoordinate.get());
+ }
+
+ @Test
+ public void findOptionalBundleNotMatchingFramework() throws IOException,
ClassNotFoundException {
+ final Bundle version3 = createBundle("my-bundle", "3.0.0");
+ final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
+ final List<Bundle> bundles = Arrays.asList(version3, otherBundle);
+ when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);
+
+ final Optional<BundleCoordinate> compatibleCoordinate =
BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE,
frameworkBundle);
+ assertFalse(compatibleCoordinate.isPresent());
+ }
+
+ @Test
+ public void testFindOptionalBundleOnlyOneBundle() throws IOException,
ClassNotFoundException {
+ final Bundle otherBundle = createBundle("my-bundle", "1.2.3");
+ final List<Bundle> bundles = Collections.singletonList(otherBundle);
+ when(extensionManager.getBundles(PROCESSOR_TYPE)).thenReturn(bundles);
+
+ final Optional<BundleCoordinate> compatibleCoordinate =
BundleUtils.findOptionalBundleForType(extensionManager, PROCESSOR_TYPE,
frameworkBundle);
+ assertTrue(compatibleCoordinate.isPresent());
+ assertEquals(otherBundle.getBundleDetails().getCoordinate(),
compatibleCoordinate.get());
+ }
+
+ private static Bundle createBundle(final String artifactId, final String
version) {
+ final BundleDetails bundleDetails = new BundleDetails.Builder()
+ .coordinate(new BundleCoordinate("org.apache.nifi", artifactId,
version))
+ .workingDir(new File("target"))
+ .build();
+
+ return new Bundle(bundleDetails,
TestBundleUtils.class.getClassLoader());
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 2ce5477474..26f835a08c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -956,7 +956,7 @@ public class StandardFlowService implements FlowService,
ProtocolHandler {
controller.setNodeId(nodeId);
// load new controller state
- loadFromBytes(dataFlow, true,
BundleUpdateStrategy.USE_SPECIFIED_OR_FAIL);
+ loadFromBytes(dataFlow, true,
BundleUpdateStrategy.USE_SPECIFIED_OR_COMPATIBLE_OR_GHOST);
// set node ID on controller before we start heartbeating because
heartbeat needs node ID
clusterCoordinator.setLocalNodeIdentifier(nodeId);
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index f4926bc04f..48c426bfc7 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -30,7 +30,6 @@ import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
-import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
@@ -353,84 +352,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
});
}
- @Test
- public void testCannotJoinClusterIfMissingNar() throws
NiFiClientException, IOException, InterruptedException {
- getClientUtil().createProcessor("GenerateFlowFile");
-
- // Shut down node 2
- disconnectNode(2);
- final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
- node2.stop();
-
- // Remove node from the cluster. This way we know when it's attempted
to connect
- final Integer node2ApiPort = getNodeApiPort(2);
- removeNode(2);
- removeExtensionsNar(node2);
-
- node2.start(false);
-
- // Wait until node is no longer removed from cluster, which will
happen when it starts up and requests to connect
- waitFor(() -> !isNodeRemoved(node2ApiPort));
-
- // Wait for node to show as disconnected because it doesn't have the
necessary nar
- waitForNodeState(2, NodeConnectionState.DISCONNECTED);
-
- // We need to restore the extensions nar and restart the node so that
subsequent tests can succeed
- restoreExtensionsNar(node2);
- node2.stop();
- node2.start();
-
- waitForAllNodesConnected();
- }
-
- private void removeNode(final int index) throws NiFiClientException,
IOException, InterruptedException {
- final NodeDTO nodeDto = getNodeEntity(index).getNode();
- final String nodeId = nodeDto.getNodeId();
- final Integer apiPort = nodeDto.getApiPort();
- getNifiClient().getControllerClient().deleteNode(nodeId);
- waitFor(() -> isNodeRemoved(apiPort));
- }
-
- private Integer getNodeApiPort(final int index) throws
NiFiClientException, IOException {
- final NodeDTO nodeDto = getNodeEntity(index).getNode();
- final Integer apiPort = nodeDto.getApiPort();
- return apiPort;
- }
-
- @Test
- public void testCanJoinClusterIfAllNodesMissingNar() throws
NiFiClientException, IOException, InterruptedException {
- final ProcessorEntity generate =
getClientUtil().createProcessor("GenerateFlowFile");
-
- // Shut down node 2
- disconnectNode(2);
- final NiFiInstance node2 = getNiFiInstance().getNodeInstance(2);
- node2.stop();
-
- final NiFiInstance node1 = getNiFiInstance().getNodeInstance(1);
- node1.stop();
-
- removeExtensionsNar(node1);
- removeExtensionsNar(node2);
-
- node1.start(false);
- node2.start(true);
-
- waitForAllNodesConnected();
-
-
assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing());
-
- // In order to ensure that subsequent tests are able to operate
properly, we need to restore the nar and restart
- node1.stop();
- node2.stop();
-
- restoreExtensionsNar(node1);
- restoreExtensionsNar(node2);
-
- node1.start(false);
- node2.start(true);
- waitForAllNodesConnected();
- }
-
@Test
public void testCannotRemoveComponentsWhileNodeDisconnected() throws
NiFiClientException, IOException, InterruptedException {
@@ -497,36 +418,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
}
- private void removeExtensionsNar(final NiFiInstance nifiInstance) {
- final File extensionsNar = getExtensionsNar(nifiInstance);
- final File backupFile = new File(extensionsNar.getParentFile(),
extensionsNar.getName() + ".backup");
- assertTrue(extensionsNar.renameTo(backupFile));
- }
-
- private void restoreExtensionsNar(final NiFiInstance nifiInstance) {
- final File backupFile = getExtensionsNar(nifiInstance);
- final File extensionsNar = new File(backupFile.getParentFile(),
backupFile.getName().replace(".backup", ""));
- assertTrue(backupFile.renameTo(extensionsNar));
- }
-
- private File getExtensionsNar(final NiFiInstance nifiInstance) {
- final File libDir = new File(nifiInstance.getInstanceDirectory(),
"lib");
- final File[] testExtensionsNar = libDir.listFiles(file ->
file.getName().startsWith("nifi-system-test-extensions-nar-"));
- assertEquals(1, testExtensionsNar.length);
-
- return testExtensionsNar[0];
- }
-
-
- private boolean isNodeRemoved(final int apiPort) {
- try {
- return
getNifiClient().getControllerClient().getNodes().getCluster().getNodes().stream()
- .noneMatch(dto -> dto.getApiPort() == apiPort);
- } catch (Exception e) {
- return false;
- }
- }
-
@Test
public void testRestartWithFlowXmlGzNoJson() throws NiFiClientException,
IOException {
restartWithOnlySingleFlowPersistenceFile("flow.json.gz");