This is an automated email from the ASF dual-hosted git repository.
ferdei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new eabbd508a1 NIFI-14547 MiNiFi - Resolve asset references in flows
coming from C2 server
eabbd508a1 is described below
commit eabbd508a1960536e76354ea06bac8785e081876
Author: Peter Kedvessy <[email protected]>
AuthorDate: Thu May 8 08:57:42 2025 +0200
NIFI-14547 MiNiFi - Resolve asset references in flows coming from C2 server
Signed-off-by: Ferenc Erdei <[email protected]>
This closes #9922.
---
.../service/MiNiFiConfigurationChangeListener.java | 6 +-
.../minifi/commons/service/FlowEnrichService.java | 3 +-
...ava => FlowPropertyAssetReferenceResolver.java} | 14 +--
.../commons/service/FlowPropertyEncryptor.java | 3 +-
.../commons/service/StandardFlowEnrichService.java | 4 +-
...dFlowPropertyAssetReferenceResolverService.java | 87 +++++++++++++
.../service/StandardFlowPropertyEncryptor.java | 4 +-
.../service/StandardFlowEnrichServiceTest.java | 22 ++--
...wPropertyAssetReferenceResolverServiceTest.java | 136 +++++++++++++++++++++
.../service/StandardFlowPropertyEncryptorTest.java | 12 +-
.../apache/nifi/minifi/c2/C2NifiClientService.java | 17 ++-
.../DefaultUpdateConfigurationStrategy.java | 26 ++--
.../syncresource/FileResourceRepository.java | 10 ++
.../command/syncresource/ResourceRepository.java | 2 +
.../SyncResourcePropertyProvider.java} | 25 ++--
.../DefaultUpdateConfigurationStrategyTest.java | 38 +++---
.../syncresource/FileResourceRepositoryTest.java | 22 ++++
17 files changed, 349 insertions(+), 82 deletions(-)
diff --git
a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
index 5925ac3b5e..58098c5e44 100644
---
a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
+++
b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
@@ -88,9 +88,9 @@ public class MiNiFiConfigurationChangeListener implements
ConfigurationChangeLis
backup(currentRawFlowConfigFile, backupRawFlowConfigFile);
byte[] rawFlow = toByteArray(flowConfigInputStream);
- VersionedDataflow rawDataFlow =
flowSerDeService.deserialize(rawFlow);
- VersionedDataflow enrichedFlow =
flowEnrichService.enrichFlow(rawDataFlow);
- byte[] serializedEnrichedFlow =
flowSerDeService.serialize(enrichedFlow);
+ VersionedDataflow dataFlow = flowSerDeService.deserialize(rawFlow);
+ flowEnrichService.enrichFlow(dataFlow);
+ byte[] serializedEnrichedFlow =
flowSerDeService.serialize(dataFlow);
persist(serializedEnrichedFlow, currentFlowConfigFile, true);
restartInstance();
persist(rawFlow, currentRawFlowConfigFile, false);
diff --git
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java
index 5b3620803f..ce15b07830 100644
---
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java
+++
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java
@@ -29,7 +29,6 @@ public interface FlowEnrichService {
* Responsible for enriching a VersionedDataflow instance
*
* @param versionedDataflow a VersionedDataflow instance
- * @return VersionedDataflow the enriched flow instance
*/
- VersionedDataflow enrichFlow(VersionedDataflow versionedDataflow);
+ void enrichFlow(VersionedDataflow versionedDataflow);
}
diff --git
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowPropertyAssetReferenceResolver.java
similarity index 64%
copy from
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java
copy to
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowPropertyAssetReferenceResolver.java
index 5b3620803f..34e1dcef94 100644
---
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java
+++
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowPropertyAssetReferenceResolver.java
@@ -19,17 +19,11 @@ package org.apache.nifi.minifi.commons.service;
import org.apache.nifi.controller.flow.VersionedDataflow;
-/**
- * Defines interface methods used to implement a FlowEnrichService.
- * The purpose of a flow enrich service is to enrich a VersionedDataFlow with
various additional components specific to the MiNiFi instance
- */
-public interface FlowEnrichService {
-
+public interface FlowPropertyAssetReferenceResolver {
/**
- * Responsible for enriching a VersionedDataflow instance
+ * Responsible for resolving asset reference properties in a
VersionedDataflow instance
*
- * @param versionedDataflow a VersionedDataflow instance
- * @return VersionedDataflow the enriched flow instance
+ * @param flow a VersionedDataflow instance to resolve its asset reference
properties
*/
- VersionedDataflow enrichFlow(VersionedDataflow versionedDataflow);
+ void resolveAssetReferenceProperties(VersionedDataflow flow);
}
diff --git
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowPropertyEncryptor.java
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowPropertyEncryptor.java
index 661b4bd242..f8481f21a3 100644
---
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowPropertyEncryptor.java
+++
b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowPropertyEncryptor.java
@@ -29,7 +29,6 @@ public interface FlowPropertyEncryptor {
* Responsible for encrypting sensitive properties in a VersionedDataflow
instance
*
* @param flow a VersionedDataflow instance to encrypt its sensitive
properties
- * @return VersionedDataflow the flow instance with encrypted sensitive
properties
*/
- VersionedDataflow encryptSensitiveProperties(VersionedDataflow flow);
+ void encryptSensitiveProperties(VersionedDataflow flow);
}
diff --git
a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java
b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java
index 9695e4305f..6e12e91311 100644
---
a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java
+++
b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichService.java
@@ -97,7 +97,7 @@ public class StandardFlowEnrichService implements
FlowEnrichService {
}
@Override
- public VersionedDataflow enrichFlow(VersionedDataflow versionedDataflow) {
+ public void enrichFlow(VersionedDataflow versionedDataflow) {
versionedDataflow.setReportingTasks(ofNullable(versionedDataflow.getReportingTasks()).orElseGet(ArrayList::new));
versionedDataflow.setRegistries(ofNullable(versionedDataflow.getRegistries()).orElseGet(ArrayList::new));
versionedDataflow.setControllerServices(ofNullable(versionedDataflow.getControllerServices()).orElseGet(ArrayList::new));
@@ -147,8 +147,6 @@ public class StandardFlowEnrichService implements
FlowEnrichService {
Map<String, String> idToInstanceIdMap =
createIdToInstanceIdMap(rootGroup);
setConnectableComponentsInstanceId(rootGroup, idToInstanceIdMap);
}
-
- return versionedDataflow;
}
private void createDefaultParameterContext(VersionedDataflow
versionedDataflow) {
diff --git
a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyAssetReferenceResolverService.java
b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyAssetReferenceResolverService.java
new file mode 100644
index 0000000000..e056984f4c
--- /dev/null
+++
b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyAssetReferenceResolverService.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.service;
+
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.flow.VersionedConfigurableExtension;
+import org.apache.nifi.flow.VersionedProcessGroup;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static java.util.Optional.ofNullable;
+import static java.util.stream.Stream.concat;
+
+public class StandardFlowPropertyAssetReferenceResolverService implements
FlowPropertyAssetReferenceResolver {
+
+ private static final String ASSET_REFERENCE_PREFIX = "@{asset-id:";
+ private static final String ASSET_REFERENCE_SUFFIX = "}";
+ private static final String EMPTY_STRING = "";
+
+ private final Function<String, Optional<Path>> assetPathResolver;
+
+ public StandardFlowPropertyAssetReferenceResolverService(Function<String,
Optional<Path>> assetPathResolver) {
+ this.assetPathResolver = assetPathResolver;
+ }
+
+ @Override
+ public void resolveAssetReferenceProperties(VersionedDataflow flow) {
+ fetchFlowComponents(flow).forEach(component -> {
+ component.getProperties().entrySet().stream()
+ .filter(e -> isAssetReference(e.getValue()))
+ .forEach(entry ->
entry.setValue(getAssetAbsolutePathOrThrowIllegalStateException(entry.getValue())));
+ });
+ }
+
+ private boolean isAssetReference(String value) {
+ return value != null
+ && value.startsWith(ASSET_REFERENCE_PREFIX)
+ && value.endsWith(ASSET_REFERENCE_SUFFIX);
+ }
+
+ private Stream<? extends VersionedConfigurableExtension>
fetchFlowComponents(VersionedDataflow flow) {
+ return concat(
+
ofNullable(flow.getControllerServices()).orElse(List.of()).stream(),
+ fetchComponentsRecursively(flow.getRootGroup())
+ );
+ }
+
+ private Stream<? extends VersionedConfigurableExtension>
fetchComponentsRecursively(VersionedProcessGroup processGroup) {
+ return concat(
+ Stream.of(
+
ofNullable(processGroup.getProcessors()).orElse(Set.of()),
+
ofNullable(processGroup.getControllerServices()).orElse(Set.of())
+ )
+ .flatMap(Set::stream),
+
ofNullable(processGroup.getProcessGroups()).orElse(Set.of()).stream()
+ .flatMap(this::fetchComponentsRecursively)
+ );
+ }
+
+ private String getAssetAbsolutePathOrThrowIllegalStateException(String
assetReference) {
+ String resourceId = assetReference.replace(ASSET_REFERENCE_PREFIX,
EMPTY_STRING)
+ .replace(ASSET_REFERENCE_SUFFIX, EMPTY_STRING);
+ return assetPathResolver.apply(resourceId)
+ .map(Path::toString)
+ .orElseThrow(() -> new IllegalStateException("Resource '" +
resourceId + "' not found"));
+ }
+}
diff --git
a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptor.java
b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptor.java
index 9222a41c2b..7f2308959b 100644
---
a/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptor.java
+++
b/minifi/minifi-commons/minifi-commons-framework/src/main/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptor.java
@@ -55,7 +55,7 @@ public class StandardFlowPropertyEncryptor implements
FlowPropertyEncryptor {
}
@Override
- public VersionedDataflow encryptSensitiveProperties(VersionedDataflow
flow) {
+ public void encryptSensitiveProperties(VersionedDataflow flow) {
encryptParameterContextsProperties(flow);
Map<String, Set<String>> sensitivePropertiesByComponentType =
Optional.of(flowProvidedSensitiveProperties(flow))
@@ -63,8 +63,6 @@ public class StandardFlowPropertyEncryptor implements
FlowPropertyEncryptor {
.orElseGet(this::runtimeManifestSensitiveProperties);
encryptFlowComponentsProperties(flow,
sensitivePropertiesByComponentType);
-
- return flow;
}
private void encryptParameterContextsProperties(VersionedDataflow flow) {
diff --git
a/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichServiceTest.java
b/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichServiceTest.java
index 2501e30aab..e9f6a8dfd6 100644
---
a/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichServiceTest.java
+++
b/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowEnrichServiceTest.java
@@ -61,10 +61,10 @@ public class StandardFlowEnrichServiceTest {
VersionedDataflow testFlow = loadDefaultFlow();
FlowEnrichService testFlowEnrichService = new
StandardFlowEnrichService(new StandardReadableProperties(properties));
- VersionedDataflow enrichedFlow =
testFlowEnrichService.enrichFlow(testFlow);
+ testFlowEnrichService.enrichFlow(testFlow);
byte[] testFlowBytes = flowToString(testFlow).getBytes(UTF_8);
- byte[] enrichedFlowBytes = flowToString(enrichedFlow).getBytes(UTF_8);
+ byte[] enrichedFlowBytes = flowToString(testFlow).getBytes(UTF_8);
assertArrayEquals(testFlowBytes, enrichedFlowBytes);
}
@@ -80,10 +80,10 @@ public class StandardFlowEnrichServiceTest {
uuid.when(UUID::randomUUID).thenReturn(expectedIdentifier);
FlowEnrichService testFlowEnrichService = new
StandardFlowEnrichService(new StandardReadableProperties(properties));
- VersionedDataflow enrichedFlow =
testFlowEnrichService.enrichFlow(testFlow);
+ testFlowEnrichService.enrichFlow(testFlow);
- assertEquals(expectedIdentifier.toString(),
enrichedFlow.getRootGroup().getIdentifier());
- assertEquals(expectedIdentifier.toString(),
enrichedFlow.getRootGroup().getInstanceIdentifier());
+ assertEquals(expectedIdentifier.toString(),
testFlow.getRootGroup().getIdentifier());
+ assertEquals(expectedIdentifier.toString(),
testFlow.getRootGroup().getInstanceIdentifier());
}
}
@@ -100,13 +100,13 @@ public class StandardFlowEnrichServiceTest {
));
FlowEnrichService testFlowEnrichService = new
StandardFlowEnrichService(new StandardReadableProperties(properties));
- VersionedDataflow enrichedFlow =
testFlowEnrichService.enrichFlow(testFlow);
+ testFlowEnrichService.enrichFlow(testFlow);
- assertEquals(1,
enrichedFlow.getRootGroup().getControllerServices().size());
- VersionedControllerService sslControllerService =
enrichedFlow.getRootGroup().getControllerServices().iterator().next();
+ assertEquals(1,
testFlow.getRootGroup().getControllerServices().size());
+ VersionedControllerService sslControllerService =
testFlow.getRootGroup().getControllerServices().iterator().next();
assertEquals(PARENT_SSL_CONTEXT_SERVICE_NAME,
sslControllerService.getName());
assertEquals(StringUtils.EMPTY,
sslControllerService.getBundle().getVersion());
- Set<VersionedProcessor> processors =
enrichedFlow.getRootGroup().getProcessors();
+ Set<VersionedProcessor> processors =
testFlow.getRootGroup().getProcessors();
assertEquals(2, processors.size());
assertTrue(
processors.stream()
@@ -132,9 +132,9 @@ public class StandardFlowEnrichServiceTest {
VersionedDataflow testFlow = loadDefaultFlow();
FlowEnrichService testFlowEnrichService = new
StandardFlowEnrichService(new StandardReadableProperties(properties));
- VersionedDataflow enrichedFlow =
testFlowEnrichService.enrichFlow(testFlow);
+ testFlowEnrichService.enrichFlow(testFlow);
- List<VersionedReportingTask> reportingTasks =
enrichedFlow.getReportingTasks();
+ List<VersionedReportingTask> reportingTasks =
testFlow.getReportingTasks();
assertEquals(1, reportingTasks.size());
VersionedReportingTask provenanceReportingTask = reportingTasks.get(0);
assertEquals(SITE_TO_SITE_PROVENANCE_REPORTING_TASK_NAME,
provenanceReportingTask.getName());
diff --git
a/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyAssetReferenceResolverServiceTest.java
b/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyAssetReferenceResolverServiceTest.java
new file mode 100644
index 0000000000..0d68309441
--- /dev/null
+++
b/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyAssetReferenceResolverServiceTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.service;
+
+import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class StandardFlowPropertyAssetReferenceResolverServiceTest {
+
+ private Map<String, String> processorProperties;
+ private Map<String, String> controllerServiceProperties;
+
+ @Mock
+ private Function<String, Optional<Path>> assetPathResolver;
+
+ @InjectMocks
+ private StandardFlowPropertyAssetReferenceResolverService victim;
+
+ @Test
+ public void testResolveAssetReferenceProperties() {
+ initProperties();
+ VersionedDataflow dataFlow = aVersionedDataflow();
+
+
when(assetPathResolver.apply("asset1")).thenReturn(Optional.of(Path.of("asset1Path")));
+
when(assetPathResolver.apply("asset2")).thenReturn(Optional.of(Path.of("asset2Path")));
+
+ victim.resolveAssetReferenceProperties(dataFlow);
+
+ verifyProperties();
+ }
+
+ @Test
+ public void testResolveNestedAssetReferenceProperties() {
+ initProperties();
+ VersionedDataflow dataFlow =
aVersionedDataflowWithNestedProcessGroup();
+
+
when(assetPathResolver.apply("asset1")).thenReturn(Optional.of(Path.of("asset1Path")));
+
when(assetPathResolver.apply("asset2")).thenReturn(Optional.of(Path.of("asset2Path")));
+
+ victim.resolveAssetReferenceProperties(dataFlow);
+
+ verifyProperties();
+ }
+
+ @Test
+ public void
testResolveAssetReferencePropertiesThrowIllegalStateException() {
+ initProperties();
+ VersionedDataflow dataFlow = aVersionedDataflow();
+
+ when(assetPathResolver.apply("asset1")).thenReturn(Optional.empty());
+
+ assertThrows(IllegalStateException.class, () -> {
+ victim.resolveAssetReferenceProperties(dataFlow);
+ });
+ }
+
+ private void initProperties() {
+ processorProperties = new HashMap<>();
+ processorProperties.put("assetReferenceProperty",
"@{asset-id:asset1}");
+ processorProperties.put("notAssetReferenceProperty", "some value1");
+
+ controllerServiceProperties = new HashMap<>();
+ controllerServiceProperties.put("assetReferenceProperty",
"@{asset-id:asset2}");
+ controllerServiceProperties.put("notAssetReferenceProperty", "some
value2");
+ }
+
+ private void verifyProperties() {
+ assertEquals(processorProperties.get("assetReferenceProperty"),
"asset1Path");
+ assertEquals(processorProperties.get("notAssetReferenceProperty"),
"some value1");
+
assertEquals(controllerServiceProperties.get("assetReferenceProperty"),
"asset2Path");
+
assertEquals(controllerServiceProperties.get("notAssetReferenceProperty"),
"some value2");
+ }
+
+ private VersionedDataflow aVersionedDataflow() {
+ VersionedDataflow versionedDataflow = new VersionedDataflow();
+ versionedDataflow.setRootGroup(aVersionedProcessGroup());
+ return versionedDataflow;
+ }
+
+ private VersionedDataflow aVersionedDataflowWithNestedProcessGroup() {
+ VersionedDataflow versionedDataflow = new VersionedDataflow();
+ VersionedProcessGroup versionedProcessGroup = new
VersionedProcessGroup();
+
versionedProcessGroup.setProcessGroups(Set.of(aVersionedProcessGroup()));
+ versionedDataflow.setRootGroup(versionedProcessGroup);
+
+ return versionedDataflow;
+ }
+
+ private VersionedProcessGroup aVersionedProcessGroup() {
+ VersionedProcessGroup versionedProcessGroup = new
VersionedProcessGroup();
+ VersionedProcessor versionedProcessor = new VersionedProcessor();
+ VersionedControllerService versionedControllerService = new
VersionedControllerService();
+
+ versionedControllerService.setProperties(controllerServiceProperties);
+ versionedProcessor.setProperties(processorProperties);
+
+ versionedProcessGroup.setProcessors(Set.of(versionedProcessor));
+
versionedProcessGroup.setControllerServices(Set.of(versionedControllerService));
+
+ return versionedProcessGroup;
+ }
+}
\ No newline at end of file
diff --git
a/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptorTest.java
b/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptorTest.java
index 0b9a4c389f..9fb3c5c75e 100644
---
a/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptorTest.java
+++
b/minifi/minifi-commons/minifi-commons-framework/src/test/java/org/apache/nifi/minifi/commons/service/StandardFlowPropertyEncryptorTest.java
@@ -120,9 +120,9 @@ public class StandardFlowPropertyEncryptorTest {
public void shouldEncryptParameterContextsSensitiveVariables() {
VersionedDataflow testFlow = flowWithParameterContexts();
- VersionedDataflow encryptedFlow =
testEncryptor.encryptSensitiveProperties(testFlow);
+ testEncryptor.encryptSensitiveProperties(testFlow);
- encryptedFlow.getParameterContexts().stream()
+ testFlow.getParameterContexts().stream()
.flatMap(context -> context.getParameters().stream())
.forEach(parameter -> {
if (parameter.isSensitive()) {
@@ -137,10 +137,10 @@ public class StandardFlowPropertyEncryptorTest {
public void shouldEncryptPropertiesUsingDescriptorsFromFlow() {
VersionedDataflow testFlow = flowWithPropertyDescriptors();
- VersionedDataflow encryptedFlow =
testEncryptor.encryptSensitiveProperties(testFlow);
+ testEncryptor.encryptSensitiveProperties(testFlow);
verify(mockRunTimeManifest, never()).getBundles();
- assertSensitiveFlowComponentPropertiesAreEncoded(encryptedFlow);
+ assertSensitiveFlowComponentPropertiesAreEncoded(testFlow);
}
@Test
@@ -148,9 +148,9 @@ public class StandardFlowPropertyEncryptorTest {
VersionedDataflow testFlow = flowWithoutPropertyDescriptors();
when(mockRunTimeManifest.getBundles()).thenReturn(runTimeManifestBundles());
- VersionedDataflow encryptedFlow =
testEncryptor.encryptSensitiveProperties(testFlow);
+ testEncryptor.encryptSensitiveProperties(testFlow);
- assertSensitiveFlowComponentPropertiesAreEncoded(encryptedFlow);
+ assertSensitiveFlowComponentPropertiesAreEncoded(testFlow);
}
private VersionedDataflow flowWithParameterContexts() {
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index 97a5e31450..f2fa27b891 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -124,9 +124,12 @@ import
org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider;
import
org.apache.nifi.minifi.c2.command.syncresource.DefaultSyncResourceStrategy;
import org.apache.nifi.minifi.c2.command.syncresource.FileResourceRepository;
import org.apache.nifi.minifi.c2.command.syncresource.ResourceRepository;
+import
org.apache.nifi.minifi.c2.command.syncresource.SyncResourcePropertyProvider;
import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
+import
org.apache.nifi.minifi.commons.service.FlowPropertyAssetReferenceResolver;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.StandardFlowEnrichService;
+import
org.apache.nifi.minifi.commons.service.StandardFlowPropertyAssetReferenceResolverService;
import org.apache.nifi.minifi.commons.service.StandardFlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.StandardFlowSerDeService;
import org.apache.nifi.nar.ExtensionManagerHolder;
@@ -248,14 +251,20 @@ public class C2NifiClientService {
UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider =
new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
PropertiesPersister propertiesPersister = new
PropertiesPersister(updatePropertiesPropertyProvider,
bootstrapConfigFileLocation);
FlowStateStrategy defaultFlowStateStrategy = new
DefaultFlowStateStrategy(flowController);
+ FlowPropertyAssetReferenceResolver flowPropertyAssetReferenceResolver
= new
StandardFlowPropertyAssetReferenceResolverService(resourceRepository::getAbsolutePath);
FlowPropertyEncryptor flowPropertyEncryptor = new
StandardFlowPropertyEncryptor(
new
PropertyEncryptorBuilder(niFiProperties.getProperty(SENSITIVE_PROPS_KEY))
.setAlgorithm(niFiProperties.getProperty(SENSITIVE_PROPS_ALGORITHM)).build(),
runtimeManifestService.getManifest());
- UpdateConfigurationStrategy updateConfigurationStrategy = new
DefaultUpdateConfigurationStrategy(flowController, flowService,
- new StandardFlowEnrichService(niFiProperties),
flowPropertyEncryptor,
- StandardFlowSerDeService.defaultInstance(),
niFiProperties.getProperty(FLOW_CONFIGURATION_FILE));
+ UpdateConfigurationStrategy updateConfigurationStrategy = new
DefaultUpdateConfigurationStrategy(
+ flowController,
+ flowService,
+ flowPropertyAssetReferenceResolver,
+ new StandardFlowEnrichService(niFiProperties),
+ flowPropertyEncryptor,
+ StandardFlowSerDeService.defaultInstance(),
+ niFiProperties.getProperty(FLOW_CONFIGURATION_FILE));
Supplier<RuntimeInfoWrapper> runtimeInfoWrapperSupplier = () ->
generateRuntimeInfo(
parseInt(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getKey(),
C2_FLOW_INFO_PROCESSOR_BULLETIN_LIMIT.getDefaultValue())),
parseBoolean(niFiProperties.getProperty(C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED.getKey(),
C2_FLOW_INFO_PROCESSOR_STATUS_ENABLED.getDefaultValue())));
@@ -268,7 +277,7 @@ public class C2NifiClientService {
UpdateAssetOperationHandler.create(client,
emptyOperandPropertiesProvider,
updateAssetCommandHelper::assetUpdatePrecondition,
updateAssetCommandHelper::assetPersistFunction),
new
UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider,
propertiesPersister::persistProperties),
- SyncResourceOperationHandler.create(client,
emptyOperandPropertiesProvider, new
DefaultSyncResourceStrategy(resourceRepository), c2Serializer),
+ SyncResourceOperationHandler.create(client, new
SyncResourcePropertyProvider(), new
DefaultSyncResourceStrategy(resourceRepository), c2Serializer),
new StartFlowOperationHandler(defaultFlowStateStrategy), new
StopFlowOperationHandler(defaultFlowStateStrategy)
));
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
index 00ceb59bae..37fdc79fcc 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategy.java
@@ -52,6 +52,7 @@ import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.minifi.commons.service.FlowEnrichService;
+import
org.apache.nifi.minifi.commons.service.FlowPropertyAssetReferenceResolver;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.FlowSerDeService;
import org.apache.nifi.minifi.validator.ValidationException;
@@ -68,6 +69,7 @@ public class DefaultUpdateConfigurationStrategy implements
UpdateConfigurationSt
private final FlowController flowController;
private final FlowService flowService;
+ private final FlowPropertyAssetReferenceResolver
flowPropertyAssetReferenceResolver;
private final FlowEnrichService flowEnrichService;
private final FlowPropertyEncryptor flowPropertyEncryptor;
private final FlowSerDeService flowSerDeService;
@@ -76,10 +78,17 @@ public class DefaultUpdateConfigurationStrategy implements
UpdateConfigurationSt
private final Path rawFlowConfigurationFile;
private final Path backupRawFlowConfigurationFile;
- public DefaultUpdateConfigurationStrategy(FlowController flowController,
FlowService flowService, FlowEnrichService flowEnrichService,
- FlowPropertyEncryptor
flowPropertyEncryptor, FlowSerDeService flowSerDeService, String
flowConfigurationFile) {
+ public DefaultUpdateConfigurationStrategy(
+ FlowController flowController,
+ FlowService flowService,
+ FlowPropertyAssetReferenceResolver
flowPropertyAssetReferenceResolver,
+ FlowEnrichService flowEnrichService,
+ FlowPropertyEncryptor flowPropertyEncryptor,
+ FlowSerDeService flowSerDeService,
+ String flowConfigurationFile) {
this.flowController = flowController;
this.flowService = flowService;
+ this.flowPropertyAssetReferenceResolver =
flowPropertyAssetReferenceResolver;
this.flowEnrichService = flowEnrichService;
this.flowPropertyEncryptor = flowPropertyEncryptor;
this.flowSerDeService = flowSerDeService;
@@ -102,12 +111,13 @@ public class DefaultUpdateConfigurationStrategy
implements UpdateConfigurationSt
.stream()
.map(Connection::getIdentifier)
.collect(Collectors.toSet());
- VersionedDataflow rawDataFlow =
flowSerDeService.deserialize(rawFlow);
+ VersionedDataflow dataFlow = flowSerDeService.deserialize(rawFlow);
- VersionedDataflow propertyEncryptedRawDataFlow =
flowPropertyEncryptor.encryptSensitiveProperties(rawDataFlow);
- byte[] serializedPropertyEncryptedRawDataFlow =
flowSerDeService.serialize(propertyEncryptedRawDataFlow);
- VersionedDataflow enrichedFlowCandidate =
flowEnrichService.enrichFlow(propertyEncryptedRawDataFlow);
- byte[] serializedEnrichedFlowCandidate =
flowSerDeService.serialize(enrichedFlowCandidate);
+
flowPropertyAssetReferenceResolver.resolveAssetReferenceProperties(dataFlow);
+ flowPropertyEncryptor.encryptSensitiveProperties(dataFlow);
+ byte[] serializedPropertyEncryptedRawDataFlow =
flowSerDeService.serialize(dataFlow);
+ flowEnrichService.enrichFlow(dataFlow);
+ byte[] serializedEnrichedFlowCandidate =
flowSerDeService.serialize(dataFlow);
backup(flowConfigurationFile, backupFlowConfigurationFile);
backup(rawFlowConfigurationFile, backupRawFlowConfigurationFile);
@@ -115,7 +125,7 @@ public class DefaultUpdateConfigurationStrategy implements
UpdateConfigurationSt
persist(serializedPropertyEncryptedRawDataFlow,
rawFlowConfigurationFile, false);
persist(serializedEnrichedFlowCandidate, flowConfigurationFile,
true);
-
reloadFlow(findAllProposedConnectionIds(enrichedFlowCandidate.getRootGroup()));
+ reloadFlow(findAllProposedConnectionIds(dataFlow.getRootGroup()));
} catch (IllegalStateException e) {
LOGGER.error("Configuration update failed. Reverting and reloading
previous flow", e);
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepository.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepository.java
index 6ed8b2f3e4..a3a7791ec8 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepository.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepository.java
@@ -161,6 +161,16 @@ public class FileResourceRepository implements
ResourceRepository {
return Optional.of(resourceItem);
}
+ @Override
+ public Optional<Path> getAbsolutePath(String resourceId) {
+ return resourceRepositoryDescriptor.resourceItems.stream()
+ .filter(resourceItem ->
resourceItem.getResourceId().equals(resourceId))
+ .map(this::resourcePath)
+ .map(Path::toAbsolutePath)
+ .findFirst();
+ }
+
+
private void initialize() {
try {
createDirectories(assetRepositoryDirectory);
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/ResourceRepository.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/ResourceRepository.java
index db431d361d..4a73b36fb3 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/ResourceRepository.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/ResourceRepository.java
@@ -38,4 +38,6 @@ public interface ResourceRepository {
Optional<ResourceItem> addResourceItem(ResourceItem resourceItem, Path
source);
Optional<ResourceItem> deleteResourceItem(ResourceItem resourceItem);
+ Optional<Path> getAbsolutePath(String resourceId);
+
}
diff --git
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/SyncResourcePropertyProvider.java
similarity index 55%
copy from
minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java
copy to
minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/SyncResourcePropertyProvider.java
index 5b3620803f..87a5fe29be 100644
---
a/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/service/FlowEnrichService.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/syncresource/SyncResourcePropertyProvider.java
@@ -15,21 +15,18 @@
* limitations under the License.
*/
-package org.apache.nifi.minifi.commons.service;
+package org.apache.nifi.minifi.c2.command.syncresource;
-import org.apache.nifi.controller.flow.VersionedDataflow;
+import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
+import java.util.Map;
-/**
- * Defines interface methods used to implement a FlowEnrichService.
- * The purpose of a flow enrich service is to enrich a VersionedDataFlow with
various additional components specific to the MiNiFi instance
- */
-public interface FlowEnrichService {
+public class SyncResourcePropertyProvider implements OperandPropertiesProvider
{
+
+ private static final String RESOLVE_ASSET_REFERENCES =
"resolveAssetReferences";
+ private static final Map<String, Object> PROPERTIES =
Map.of(RESOLVE_ASSET_REFERENCES, Boolean.TRUE);
- /**
- * Responsible for enriching a VersionedDataflow instance
- *
- * @param versionedDataflow a VersionedDataflow instance
- * @return VersionedDataflow the enriched flow instance
- */
- VersionedDataflow enrichFlow(VersionedDataflow versionedDataflow);
+ @Override
+ public Map<String, Object> getProperties() {
+ return PROPERTIES;
+ }
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java
index 6c2226f803..4addb65e7e 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/DefaultUpdateConfigurationStrategyTest.java
@@ -50,6 +50,7 @@ import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.minifi.commons.service.FlowEnrichService;
+import
org.apache.nifi.minifi.commons.service.FlowPropertyAssetReferenceResolver;
import org.apache.nifi.minifi.commons.service.FlowPropertyEncryptor;
import org.apache.nifi.minifi.commons.service.FlowSerDeService;
import org.apache.nifi.services.FlowService;
@@ -67,13 +68,10 @@ public class DefaultUpdateConfigurationStrategyTest {
private static final byte[] ORIGINAL_RAW_FLOW_CONFIG_CONTENT =
"original_raw_content".getBytes(UTF_8);
private static final byte[] ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT =
"original_enriched_content".getBytes(UTF_8);
-
private static final byte[] NEW_RAW_FLOW_CONFIG_CONTENT =
"new_raw_content".getBytes(UTF_8);
- private static final VersionedDataflow NEW_RAW_FLOW_CONFIG = new
VersionedDataflow();
+ private static final VersionedDataflow DATA_FLOW = new VersionedDataflow();
private static final byte[] NEW_ENCRYPTED_FLOW_CONFIG_CONTENT =
"original_encrypted_content".getBytes(UTF_8);
- private static final VersionedDataflow NEW_ENCRYPTED_FLOW_CONFIG = new
VersionedDataflow();
private static final byte[] NEW_ENRICHED_FLOW_CONFIG_CONTENT =
"new_enriched_content".getBytes(UTF_8);
- private static final VersionedDataflow NEW_ENRICHED_FLOW_CONFIG = new
VersionedDataflow();
@TempDir
private File tempDir;
@@ -87,6 +85,8 @@ public class DefaultUpdateConfigurationStrategyTest {
@Mock
private FlowPropertyEncryptor mockFlowPropertyEncryptor;
@Mock
+ private FlowPropertyAssetReferenceResolver
mockFlowPropertyAssetReferenceResolver;
+ @Mock
private FlowSerDeService mockFlowSerDeService;
@Mock
private FlowManager mockFlowManager;
@@ -108,8 +108,14 @@ public class DefaultUpdateConfigurationStrategyTest {
rawFlowConfigurationFile =
flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName +
RAW_EXTENSION);
backupRawFlowConfigurationFile =
flowConfigurationFile.getParent().resolve(flowConfigurationFileBaseName +
BACKUP_EXTENSION + RAW_EXTENSION);
- testUpdateConfigurationStrategy = new
DefaultUpdateConfigurationStrategy(mockFlowController, mockFlowService,
mockFlowEnrichService,
- mockFlowPropertyEncryptor, mockFlowSerDeService,
flowConfigurationFile.toString());
+ testUpdateConfigurationStrategy = new
DefaultUpdateConfigurationStrategy(
+ mockFlowController,
+ mockFlowService,
+ mockFlowPropertyAssetReferenceResolver,
+ mockFlowEnrichService,
+ mockFlowPropertyEncryptor,
+ mockFlowSerDeService,
+ flowConfigurationFile.toString());
writeGzipFile(flowConfigurationFile,
ORIGINAL_ENRICHED_FLOW_CONFIG_CONTENT);
writePlainTextFile(rawFlowConfigurationFile,
ORIGINAL_RAW_FLOW_CONFIG_CONTENT);
@@ -118,11 +124,8 @@ public class DefaultUpdateConfigurationStrategyTest {
@Test
public void testFlowIsUpdatedAndBackupsAreClearedUp() throws IOException {
// given
-
when(mockFlowSerDeService.deserialize(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_RAW_FLOW_CONFIG);
-
when(mockFlowPropertyEncryptor.encryptSensitiveProperties(NEW_RAW_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG);
-
when(mockFlowSerDeService.serialize(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG_CONTENT);
-
when(mockFlowEnrichService.enrichFlow(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG);
-
when(mockFlowSerDeService.serialize(NEW_ENRICHED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
+
when(mockFlowSerDeService.deserialize(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(DATA_FLOW);
+
when(mockFlowSerDeService.serialize(DATA_FLOW)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG_CONTENT,
NEW_ENRICHED_FLOW_CONFIG_CONTENT);
when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
when(mockFlowManager.getRootGroup()).thenReturn(mockProcessGroup);
@@ -137,6 +140,9 @@ public class DefaultUpdateConfigurationStrategyTest {
assertFalse(exists(backupFlowConfigurationFile));
assertFalse(exists(backupRawFlowConfigurationFile));
verify(mockFlowService, times(1)).load(null);
+ verify(mockFlowPropertyAssetReferenceResolver,
times(1)).resolveAssetReferenceProperties(DATA_FLOW);
+ verify(mockFlowEnrichService, times(1)).enrichFlow(DATA_FLOW);
+ verify(mockFlowPropertyEncryptor,
times(1)).encryptSensitiveProperties(DATA_FLOW);
verify(mockFlowController, times(1)).onFlowInitialized(true);
verify(mockProcessGroup, times(1)).startProcessing();
}
@@ -144,11 +150,8 @@ public class DefaultUpdateConfigurationStrategyTest {
@Test
public void testFlowIsRevertedInCaseOfAnyErrorAndBackupsAreClearedUp()
throws IOException {
// given
-
when(mockFlowSerDeService.deserialize(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(NEW_RAW_FLOW_CONFIG);
-
when(mockFlowPropertyEncryptor.encryptSensitiveProperties(NEW_RAW_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG);
-
when(mockFlowSerDeService.serialize(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG_CONTENT);
-
when(mockFlowEnrichService.enrichFlow(NEW_ENCRYPTED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG);
-
when(mockFlowSerDeService.serialize(NEW_ENRICHED_FLOW_CONFIG)).thenReturn(NEW_ENRICHED_FLOW_CONFIG_CONTENT);
+
when(mockFlowSerDeService.deserialize(NEW_RAW_FLOW_CONFIG_CONTENT)).thenReturn(DATA_FLOW);
+
when(mockFlowSerDeService.serialize(DATA_FLOW)).thenReturn(NEW_ENCRYPTED_FLOW_CONFIG_CONTENT,
NEW_ENRICHED_FLOW_CONFIG_CONTENT);
when(mockFlowController.getFlowManager()).thenReturn(mockFlowManager);
when(mockFlowManager.getRootGroup()).thenReturn(mockProcessGroup);
doThrow(new IOException()).when(mockFlowService).load(null);
@@ -165,6 +168,9 @@ public class DefaultUpdateConfigurationStrategyTest {
assertFalse(exists(backupFlowConfigurationFile));
assertFalse(exists(backupRawFlowConfigurationFile));
verify(mockFlowService, times(1)).load(null);
+ verify(mockFlowEnrichService, times(1)).enrichFlow(DATA_FLOW);
+ verify(mockFlowPropertyEncryptor,
times(1)).encryptSensitiveProperties(DATA_FLOW);
+ verify(mockFlowPropertyAssetReferenceResolver,
times(1)).resolveAssetReferenceProperties(DATA_FLOW);
verify(mockFlowController, times(0)).onFlowInitialized(true);
verify(mockProcessGroup, times(0)).startProcessing();
}
diff --git
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepositoryTest.java
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepositoryTest.java
index ed13fb1648..47600be662 100644
---
a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepositoryTest.java
+++
b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/syncresource/FileResourceRepositoryTest.java
@@ -378,6 +378,28 @@ public class FileResourceRepositoryTest {
assertTrue(result.isEmpty());
}
+ @Test
+ public void testGetRelativePathReturnsEmptyInCaseOfResourceNotAvailable()
throws IOException {
+ FileResourceRepository testRepository = createTestRepository();
+
+ Optional<Path> relativePath =
testRepository.getAbsolutePath("non_existing_resource_id");
+
+ assertTrue(relativePath.isEmpty());
+ }
+
+ @Test
+ public void testGetRelativePath() throws IOException {
+ FileResourceRepository testRepository = createTestRepository();
+ ResourceItem resourceItem = resourceItem("resource1", "subfolder",
ASSET);
+ Path resourceItemPath = createResourceBinary(resourceItem,
RESOURCE_BINARY_CONTENT);
+ testRepository.addResourceItem(resourceItem);
+
+ Optional<Path> relativePath =
testRepository.getAbsolutePath("resource1");
+
+ assertTrue(relativePath.isPresent());
+ assertEquals(resourceItemPath.toString(),
relativePath.get().toString());
+ }
+
private ResourceRepositoryDescriptor loadRepository() throws IOException {
return c2Serializer.deserialize(readString(repositoryFile),
ResourceRepositoryDescriptor.class).orElse(null);
}