This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 7dc9cf250e NIFI-15514: Ensure that Parameter Contexts are assigned to
all Proces… (#10815)
7dc9cf250e is described below
commit 7dc9cf250e06e966157dde747eb3451402a5b5fd
Author: Mark Payne <[email protected]>
AuthorDate: Tue Feb 3 11:53:15 2026 -0500
NIFI-15514: Ensure that Parameter Contexts are assigned to all Proces…
(#10815)
* NIFI-15514: Ensure that Parameter Contexts are assigned to all Process
Groups in a Connector and not just the top-level group. Ensure that all
components are started when Connector starts instead of just Processors and
Controller Services
- When Working Context is recreated, ensure that we appropriately apply
Parameter Context to newly created Process Group(s)
- Ensure that when we cleanup unused assets for Connectors that we consider
any assets that are referenced in either the Working or Active context instead
of just the Active context
- Ensure that when we stop Process Group we call all tasks in background
threads instead of calling .thenRun which could potentially run in the
foreground thread
* NIFI-15514: Update parameter context assignment to occur during sync.
* NIFI-15514: Fixes around ensuring that processors/controller services are
properly configured and notified of any configuration changes when parameters
change
- Removed the updateParameterContexts from ProcessGroup.updateFlow, which
was added in a previous commit as we went a different direction for the fix
---------
Co-authored-by: Bob Paulin <[email protected]>
---
.../nifi/controller/StandardProcessorNode.java | 6 +-
.../nifi/controller/flow/AbstractFlowManager.java | 29 +-
.../StandardVersionedComponentSynchronizer.java | 57 +--
.../nifi/parameter/StandardParameterContext.java | 28 +-
.../nifi/controller/AbstractComponentNode.java | 4 +-
.../apache/nifi/controller/flow/FlowManager.java | 11 +-
.../connector/ConnectorParameterLookup.java | 17 +
.../connector/StandardConnectorRepository.java | 37 +-
.../components/connector/StandardFlowContext.java | 26 +-
.../StandaloneProcessGroupLifecycle.java | 25 +-
.../flow/FlowControllerFlowContextFactory.java | 42 +-
.../nifi/controller/flow/StandardFlowManager.java | 5 +-
.../connector/OnPropertyModifiedConnector.java | 88 ++++
.../connector/StandardConnectorNodeIT.java | 47 +++
.../connector/TestStandardConnectorRepository.java | 72 ++++
.../processors/OnPropertyModifiedTracker.java | 116 ++++++
.../repository/StandardProcessSessionIT.java | 4 +-
.../controller/service/mock/MockProcessGroup.java | 2 +-
.../flows/on-property-modified-tracker.json | 98 +++++
.../tests/system/ParameterContextConnector.java | 447 +++++++++++++++++++++
.../processors/tests/system/UpdateContent.java | 41 +-
.../org.apache.nifi.components.connector.Connector | 1 +
.../connectors/ConnectorParameterContextIT.java | 119 ++++++
23 files changed, 1213 insertions(+), 109 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 7570cfe9e1..6ed1aec4a0 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1661,10 +1661,10 @@ public class StandardProcessorNode extends
ProcessorNode implements Connectable
if (validationStatus != ValidationStatus.VALID) {
LOG.debug("Cannot start {} because Processor is currently not
valid; will try again after 5 seconds", StandardProcessorNode.this);
- startupAttemptCount.incrementAndGet();
- if (startupAttemptCount.get() == 240 ||
startupAttemptCount.get() % 7200 == 0) {
+ final long attempt = startupAttemptCount.getAndIncrement();
+ if (attempt % 7200 == 0) {
final ValidationState validationState =
getValidationState();
- procLog.error("Encountering difficulty starting.
(Validation State is {}: {}). Will continue trying to start.",
+ procLog.warn("Encountering difficulty starting.
(Validation State is {}: {}). Will continue trying to start.",
validationState,
validationState.getValidationErrors());
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
index 076f2efa99..821a33d5bd 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
@@ -648,21 +648,24 @@ public abstract class AbstractFlowManager implements
FlowManager {
final Map<String, Parameter> parameters, final List<String>
inheritedContextIds,
final ParameterProviderConfiguration
parameterProviderConfiguration) {
- return createParameterContext(id, name, description, parameters,
inheritedContextIds, parameterProviderConfiguration, true);
+ final ParameterReferenceManager referenceManager = new
StandardParameterReferenceManager(this::getRootGroup);
+ return createParameterContext(id, name, description, parameters,
inheritedContextIds, parameterProviderConfiguration, referenceManager, true);
}
protected ParameterContext createParameterContext(final String id, final
String name, final String description,
final Map<String, Parameter> parameters, final List<String>
inheritedContextIds,
- final ParameterProviderConfiguration
parameterProviderConfiguration, final boolean register) {
+ final ParameterProviderConfiguration
parameterProviderConfiguration, final ParameterReferenceManager
referenceManager,
+ final boolean register) {
- final boolean namingConflict =
parameterContextManager.getParameterContexts().stream()
+ if (register) {
+ final boolean namingConflict =
parameterContextManager.getParameterContexts().stream()
.anyMatch(paramContext -> paramContext.getName().equals(name));
- if (namingConflict) {
- throw new IllegalStateException("Cannot create Parameter Context
with name '" + name + "' because a Parameter Context already exists with that
name");
+ if (namingConflict) {
+ throw new IllegalStateException("Cannot create Parameter
Context with name '" + name + "' because a Parameter Context already exists
with that name");
+ }
}
- final ParameterReferenceManager referenceManager = new
StandardParameterReferenceManager(this::getRootGroup);
final ParameterContext parameterContext = new
StandardParameterContext.Builder()
.id(id)
.name(name)
@@ -693,19 +696,13 @@ public abstract class AbstractFlowManager implements
FlowManager {
}
@Override
- public ParameterContext duplicateParameterContext(final String id, final
ParameterContext source) {
+ public ParameterContext createEmptyParameterContext(final String id, final
String name, final String description, final ProcessGroup rootGroup) {
final Map<String, Parameter> parameterMap = new HashMap<>();
- for (final Parameter parameter : source.getParameters().values()) {
- parameterMap.put(parameter.getDescriptor().getName(), parameter);
- }
-
final List<String> inheritedContextIds = new ArrayList<>();
- for (final ParameterContext inherited :
source.getInheritedParameterContexts()) {
- inheritedContextIds.add(inherited.getIdentifier());
- }
- return createParameterContext(id, source.getName(),
source.getDescription(),
- parameterMap, inheritedContextIds,
source.getParameterProviderConfiguration(), false);
+ final ParameterReferenceManager parameterReferenceManager = new
StandardParameterReferenceManager(() -> rootGroup);
+ return createParameterContext(id, name, description,
+ parameterMap, inheritedContextIds, null,
parameterReferenceManager, false);
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
index 3050a7db70..16ef1fa909 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java
@@ -1385,6 +1385,10 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
destination.addProcessGroup(group);
+ // Connectors will have a single parameter context so if we are
creating a group set the context of the parent process group.
+ if (connectorId != null) {
+ group.setParameterContext(destination.getParameterContext());
+ }
synchronize(group, proposed, versionedParameterContexts,
parameterProviderReferences, topLevelGroup, true);
return group;
@@ -2143,35 +2147,40 @@ public class StandardVersionedComponentSynchronizer
implements VersionedComponen
private void updateParameterContext(final ProcessGroup group, final
VersionedProcessGroup proposed, final Map<String, VersionedParameterContext>
versionedParameterContexts,
final Map<String,
ParameterProviderReference> parameterProviderReferences, final
ComponentIdGenerator componentIdGenerator) {
- // Update the Parameter Context
+
+ // If proposed parameter context is null, set group's parameter
context to null and we're done.
final ParameterContext currentParamContext =
group.getParameterContext();
final String proposedParameterContextName =
proposed.getParameterContextName();
- if (proposedParameterContextName == null && currentParamContext !=
null) {
+ if (proposedParameterContextName == null) {
group.setParameterContext(null);
- } else if (proposedParameterContextName != null) {
- final VersionedParameterContext versionedParameterContext =
versionedParameterContexts.get(proposedParameterContextName);
- if (versionedParameterContext != null) {
- createMissingParameterProvider(versionedParameterContext,
versionedParameterContext.getParameterProvider(), parameterProviderReferences,
componentIdGenerator);
- if (currentParamContext == null) {
- // Create a new Parameter Context based on the parameters
provided
- final ParameterContext contextByName =
getParameterContextByName(versionedParameterContext.getName());
- final ParameterContext selectedParameterContext;
- if (contextByName == null) {
- final String parameterContextId =
componentIdGenerator.generateUuid(versionedParameterContext.getName(),
- versionedParameterContext.getName(),
versionedParameterContext.getName());
- selectedParameterContext =
createParameterContext(versionedParameterContext, parameterContextId,
versionedParameterContexts,
- parameterProviderReferences,
componentIdGenerator);
- } else {
- selectedParameterContext = contextByName;
- addMissingConfiguration(versionedParameterContext,
selectedParameterContext, versionedParameterContexts,
parameterProviderReferences, componentIdGenerator);
- }
+ return;
+ }
- group.setParameterContext(selectedParameterContext);
- } else {
- // Update the current Parameter Context so that it has any
Parameters included in the proposed context
- addMissingConfiguration(versionedParameterContext,
currentParamContext, versionedParameterContexts, parameterProviderReferences,
componentIdGenerator);
- }
+ // No versioned parameter context with a matching name. Nothing to do.
+ final VersionedParameterContext versionedParameterContext =
versionedParameterContexts.get(proposedParameterContextName);
+ if (versionedParameterContext == null) {
+ return;
+ }
+
+ createMissingParameterProvider(versionedParameterContext,
versionedParameterContext.getParameterProvider(), parameterProviderReferences,
componentIdGenerator);
+ if (currentParamContext == null) {
+ // Create a new Parameter Context based on the parameters provided
+ final ParameterContext contextByName =
getParameterContextByName(versionedParameterContext.getName());
+ final ParameterContext selectedParameterContext;
+ if (contextByName == null) {
+ final String parameterContextId =
componentIdGenerator.generateUuid(versionedParameterContext.getName(),
+ versionedParameterContext.getName(),
versionedParameterContext.getName());
+ selectedParameterContext =
createParameterContext(versionedParameterContext, parameterContextId,
versionedParameterContexts,
+ parameterProviderReferences, componentIdGenerator);
+ } else {
+ selectedParameterContext = contextByName;
+ addMissingConfiguration(versionedParameterContext,
selectedParameterContext, versionedParameterContexts,
parameterProviderReferences, componentIdGenerator);
}
+
+ group.setParameterContext(selectedParameterContext);
+ } else {
+ // Update the current Parameter Context so that it has any
Parameters included in the proposed context
+ addMissingConfiguration(versionedParameterContext,
currentParamContext, versionedParameterContexts, parameterProviderReferences,
componentIdGenerator);
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterContext.java
index 0489e618ec..3c56f9e817 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/parameter/StandardParameterContext.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -147,6 +148,7 @@ public class StandardParameterContext implements
ParameterContext {
} finally {
writeLock.unlock();
}
+
alertReferencingComponents(parameterUpdates);
}
@@ -172,21 +174,25 @@ public class StandardParameterContext implements
ParameterContext {
* @param parameterUpdates A map from parameter name to ParameterUpdate
(empty if none are applicable)
*/
private void alertReferencingComponents(final Map<String, ParameterUpdate>
parameterUpdates) {
- if (!parameterUpdates.isEmpty()) {
- logger.debug("Parameter Context {} was updated. {} parameters
changed ({}). Notifying all affected components.", this,
parameterUpdates.size(), parameterUpdates);
-
- for (final ProcessGroup processGroup :
parameterReferenceManager.getProcessGroupsBound(this)) {
- try {
- processGroup.onParameterContextUpdated(parameterUpdates);
- } catch (final Exception e) {
- logger.error("Failed to notify {} that Parameter Context
was updated", processGroup, e);
- }
+ if (parameterUpdates.isEmpty()) {
+ logger.debug("{} updated. No parameters changed so no existing
components are affected.", this);
+ return;
+ }
+
+ logger.debug("Parameter Context {} was updated. {} parameters changed
({}). Notifying all affected components.", this, parameterUpdates.size(),
parameterUpdates);
+ for (final ProcessGroup processGroup : getBoundProcessGroups()) {
+ try {
+ processGroup.onParameterContextUpdated(parameterUpdates);
+ } catch (final Exception e) {
+ logger.error("Failed to notify {} that Parameter Context was
updated", processGroup, e);
}
- } else {
- logger.debug("Parameter Context {} was updated. {} parameters
changed ({}). No existing components are affected.", this,
parameterUpdates.size(), parameterUpdates);
}
}
+ protected Set<ProcessGroup> getBoundProcessGroups() {
+ return parameterReferenceManager.getProcessGroupsBound(this);
+ }
+
/**
* Returns a map from parameter name to ParameterUpdate for any actual
updates to parameters.
* @param currentParameters The current parameters
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index ecaa4d3f31..39b261ba4a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -608,7 +608,9 @@ public abstract class AbstractComponentNode implements
ComponentNode {
if (!propertyConfiguration.equals(propertyModComparisonValue)) {
try {
final String oldValue = propertyModComparisonValue == null ?
null : propertyModComparisonValue.getEffectiveValue(getParameterContext());
- onPropertyModified(descriptor, oldValue, resolvedValue);
+ if (!Objects.equals(oldValue, resolvedValue)) {
+ onPropertyModified(descriptor, oldValue, resolvedValue);
+ }
} catch (final Exception e) {
// nothing really to do here...
logger.error("Failed to notify {} that property {} changed",
this, descriptor, e);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
index c1a363a5ab..5e957e98c1 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java
@@ -388,13 +388,16 @@ public interface FlowManager extends
ParameterProviderLookup {
List<String> inheritedContextIds,
ParameterProviderConfiguration parameterProviderConfiguration);
/**
- * Creates a duplicate of the given ParameterContext with the provided id.
This does not register the Parameter Context
- * with the ParameterContextManager.
+ * Creates an empty Parameter Context with the given ID for the provided
root process group. This Parameter Context is not
+ * registered with the ParameterContextManager.
+ *
* @param id the id of the new ParameterContext
- * @param source the ParameterContext to duplicate
+ * @param name the name of the new ParameterContext
+ * @param description the description of the new ParameterContext
+ * @param rootGroup the root process group
* @return the duplicated ParameterContext
*/
- ParameterContext duplicateParameterContext(String id, ParameterContext
source);
+ ParameterContext createEmptyParameterContext(String id, String name,
String description, ProcessGroup rootGroup);
/**
* Performs the given ParameterContext-related action, and then resolves
all inherited ParameterContext references.
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/ConnectorParameterLookup.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/ConnectorParameterLookup.java
index c42001a0b9..ada91a1d41 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/ConnectorParameterLookup.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/ConnectorParameterLookup.java
@@ -61,6 +61,23 @@ public class ConnectorParameterLookup implements
ParameterLookup {
.build();
}
+ public VersionedParameterContext createVersionedParameterContext(final
String name) {
+ final VersionedParameterContext context = new
VersionedParameterContext();
+ context.setName(name);
+
+ final Set<VersionedParameter> versionedParameters = new HashSet<>();
+ for (final ParameterValue parameterValue : parameterValues) {
+ final VersionedParameter versionedParameter = new
VersionedParameter();
+ versionedParameter.setName(parameterValue.getName());
+ versionedParameter.setValue(parameterValue.getValue());
+ versionedParameter.setSensitive(parameterValue.isSensitive());
+ versionedParameters.add(versionedParameter);
+ }
+
+ context.setParameters(versionedParameters);
+ return context;
+ }
+
@Override
public Optional<Parameter> getParameter(final String parameterName) {
return Optional.ofNullable(parameters.get(parameterName));
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
index b78d504112..5ae380cdfa 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
@@ -224,22 +224,9 @@ public class StandardConnectorRepository implements
ConnectorRepository {
}
private void cleanUpAssets(final ConnectorNode connector) {
- final FrameworkFlowContext activeFlowContext =
connector.getActiveFlowContext();
- final ConnectorConfiguration activeConfiguration =
activeFlowContext.getConfigurationContext().toConnectorConfiguration();
-
final Set<String> referencedAssetIds = new HashSet<>();
- for (final NamedStepConfiguration namedStepConfiguration :
activeConfiguration.getNamedStepConfigurations()) {
- final StepConfiguration stepConfiguration =
namedStepConfiguration.configuration();
- final Map<String, ConnectorValueReference> stepPropertyValues =
stepConfiguration.getPropertyValues();
- if (stepPropertyValues == null) {
- continue;
- }
- for (final ConnectorValueReference valueReference :
stepPropertyValues.values()) {
- if (valueReference instanceof AssetReference assetReference) {
-
referencedAssetIds.addAll(assetReference.getAssetIdentifiers());
- }
- }
- }
+ collectReferencedAssetIds(connector.getActiveFlowContext(),
referencedAssetIds);
+ collectReferencedAssetIds(connector.getWorkingFlowContext(),
referencedAssetIds);
logger.debug("Found {} assets referenced for Connector [{}]",
referencedAssetIds.size(), connector.getIdentifier());
@@ -258,6 +245,26 @@ public class StandardConnectorRepository implements
ConnectorRepository {
}
}
+ private void collectReferencedAssetIds(final FrameworkFlowContext
flowContext, final Set<String> referencedAssetIds) {
+ if (flowContext == null) {
+ return;
+ }
+
+ final ConnectorConfiguration configuration =
flowContext.getConfigurationContext().toConnectorConfiguration();
+ for (final NamedStepConfiguration namedStepConfiguration :
configuration.getNamedStepConfigurations()) {
+ final StepConfiguration stepConfiguration =
namedStepConfiguration.configuration();
+ final Map<String, ConnectorValueReference> stepPropertyValues =
stepConfiguration.getPropertyValues();
+ if (stepPropertyValues == null) {
+ continue;
+ }
+ for (final ConnectorValueReference valueReference :
stepPropertyValues.values()) {
+ if (valueReference instanceof AssetReference assetReference) {
+
referencedAssetIds.addAll(assetReference.getAssetIdentifiers());
+ }
+ }
+ }
+ }
+
@Override
public void configureConnector(final ConnectorNode connector, final String
stepName, final StepConfiguration configuration) throws FlowUpdateException {
connector.setConfiguration(stepName, configuration);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
index 70ff4e079d..3d56a999af 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardFlowContext.java
@@ -26,9 +26,10 @@ import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.parameter.ParameterContext;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.Map;
public class StandardFlowContext implements FrameworkFlowContext {
private final ProcessGroup managedProcessGroup;
@@ -81,32 +82,33 @@ public class StandardFlowContext implements
FrameworkFlowContext {
versionedExternalFlow.setParameterContexts(new HashMap<>());
}
- final String parameterContextName =
managedProcessGroup.getParameterContext().getName();
- updateParameterContext(versionedExternalFlow.getFlowContents(),
parameterContextName);
-
try {
managedProcessGroup.verifyCanUpdate(versionedExternalFlow, true,
false);
} catch (final IllegalStateException e) {
throw new FlowUpdateException("Flow is not in a state that allows
the requested updated", e);
}
- final VersionedExternalFlow withoutParameterContext = new
VersionedExternalFlow();
-
withoutParameterContext.setFlowContents(versionedExternalFlow.getFlowContents());
- withoutParameterContext.setParameterContexts(Collections.emptyMap());
- managedProcessGroup.updateFlow(withoutParameterContext,
managedProcessGroup.getIdentifier(), false, true, true);
+ final ParameterContext managedGroupParameterContext =
managedProcessGroup.getParameterContext();
+ updateParameterContextNames(versionedExternalFlow.getFlowContents(),
managedGroupParameterContext.getName());
- final ConnectorParameterLookup parameterLookup = new
ConnectorParameterLookup(versionedExternalFlow.getParameterContexts().values(),
assetManager);
-
getParameterContext().updateParameters(parameterLookup.getParameterValues());
+ final VersionedExternalFlow externalFlowWithResolvedParameters = new
VersionedExternalFlow();
+
externalFlowWithResolvedParameters.setFlowContents(versionedExternalFlow.getFlowContents());
+ externalFlowWithResolvedParameters.setParameterContexts(Map.of());
+
+ managedProcessGroup.updateFlow(externalFlowWithResolvedParameters,
managedProcessGroup.getIdentifier(), false, true, true);
rootGroup = groupFacadeFactory.create(managedProcessGroup,
connectorLog);
+
+ final ConnectorParameterLookup parameterLookup = new
ConnectorParameterLookup(versionedExternalFlow.getParameterContexts().values(),
assetManager);
+
getParameterContext().updateParameters(parameterLookup.getParameterValues());
parameterContext =
parameterContextFacadeFactory.create(managedProcessGroup);
}
- private void updateParameterContext(final VersionedProcessGroup group,
final String parameterContextName) {
+ private void updateParameterContextNames(final VersionedProcessGroup
group, final String parameterContextName) {
group.setParameterContextName(parameterContextName);
if (group.getProcessGroups() != null) {
for (final VersionedProcessGroup childGroup :
group.getProcessGroups()) {
- updateParameterContext(childGroup, parameterContextName);
+ updateParameterContextNames(childGroup, parameterContextName);
}
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
index 1faee73481..ccc2a0a8f3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/facades/standalone/StandaloneProcessGroupLifecycle.java
@@ -181,7 +181,6 @@ public class StandaloneProcessGroupLifecycle implements
ProcessGroupLifecycle {
return disableControllerServices(serviceNodes);
}
- // TODO: Need a `startComponents` and `stopComponents` that includes
Processors, Ports, Stateless Groups, etc.
@Override
public CompletableFuture<Void> startProcessors() {
final Collection<ProcessorNode> processors =
processGroup.getProcessors();
@@ -206,19 +205,25 @@ public class StandaloneProcessGroupLifecycle implements
ProcessGroupLifecycle {
}
final CompletableFuture<Void> enableServicesFuture =
enableControllerServices(serviceReferenceScope,
ControllerServiceReferenceHierarchy.DIRECT_SERVICES_ONLY);
- final CompletableFuture<Void> enableAllComponents =
enableServicesFuture.thenRun(this::startPorts)
- .thenRun(this::startRemoteProcessGroups)
+ final CompletableFuture<Void> startAllComponents =
enableServicesFuture.thenRunAsync(this::startPorts)
+ .thenRunAsync(this::startRemoteProcessGroups)
.thenCompose(v -> startProcessors());
final List<CompletableFuture<Void>> childGroupFutures = new
ArrayList<>();
for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
- final ProcessGroupLifecycle childLifecycle =
childGroupLifecycleFactory.apply(childGroup.getIdentifier());
+ final String childGroupId =
childGroup.getVersionedComponentId().orElse(null);
+ if (childGroupId == null) {
+ logger.warn("Encountered child Process Group {} without a
Versioned Component ID. Skipping start of child group.",
childGroup.getIdentifier());
+ continue;
+ }
+
+ final ProcessGroupLifecycle childLifecycle =
childGroupLifecycleFactory.apply(childGroupId);
final CompletableFuture<Void> childFuture =
childLifecycle.start(serviceReferenceScope);
childGroupFutures.add(childFuture);
}
final CompletableFuture<Void> compositeChildFutures =
CompletableFuture.allOf(childGroupFutures.toArray(new CompletableFuture[0]));
- return CompletableFuture.allOf(enableAllComponents,
compositeChildFutures);
+ return CompletableFuture.allOf(startAllComponents,
compositeChildFutures);
}
private void startPorts() {
@@ -264,7 +269,7 @@ public class StandaloneProcessGroupLifecycle implements
ProcessGroupLifecycle {
final CompletableFuture<Void> stopProcessorsFuture = stopProcessors();
return stopProcessorsFuture.thenCompose(voidValue -> stopChildren())
- .thenRun(this::stopPorts)
+ .thenRunAsync(this::stopPorts)
.thenCompose(voidValue -> stopRemoteProcessGroups())
.thenCompose(voidValue ->
disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS));
}
@@ -272,7 +277,13 @@ public class StandaloneProcessGroupLifecycle implements
ProcessGroupLifecycle {
private CompletableFuture<Void> stopChildren() {
final List<CompletableFuture<Void>> childGroupFutures = new
ArrayList<>();
for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
- final ProcessGroupLifecycle childLifecycle =
childGroupLifecycleFactory.apply(childGroup.getIdentifier());
+ final String childGroupId =
childGroup.getVersionedComponentId().orElse(null);
+ if (childGroupId == null) {
+ logger.warn("Encountered child Process Group {} without a
Versioned Component ID. Skipping stop of child group.",
childGroup.getIdentifier());
+ continue;
+ }
+
+ final ProcessGroupLifecycle childLifecycle =
childGroupLifecycleFactory.apply(childGroupId);
final CompletableFuture<Void> childFuture = childLifecycle.stop();
childGroupFutures.add(childFuture);
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/FlowControllerFlowContextFactory.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/FlowControllerFlowContextFactory.java
index b7ed11d323..dbca31b7e3 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/FlowControllerFlowContextFactory.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/FlowControllerFlowContextFactory.java
@@ -25,12 +25,15 @@ import
org.apache.nifi.components.connector.ProcessGroupFacadeFactory;
import org.apache.nifi.components.connector.ProcessGroupFactory;
import org.apache.nifi.components.connector.StandardFlowContext;
import org.apache.nifi.components.connector.components.FlowContextType;
+import org.apache.nifi.components.connector.components.ParameterContextFacade;
+import org.apache.nifi.components.connector.components.ParameterValue;
import
org.apache.nifi.components.connector.facades.standalone.StandaloneParameterContextFacade;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.flow.mapping.ComponentIdLookup;
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
@@ -39,6 +42,8 @@ import
org.apache.nifi.registry.flow.mapping.VersionedComponentFlowMapper;
import org.apache.nifi.registry.flow.mapping.VersionedComponentStateLookup;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -98,19 +103,38 @@ public class FlowControllerFlowContextFactory implements
FlowContextFactory {
final VersionedComponentFlowMapper flowMapper = new
VersionedComponentFlowMapper(flowController.getExtensionManager(),
flowMappingOptions);
final InstantiatedVersionedProcessGroup versionedGroup =
flowMapper.mapProcessGroup(sourceGroup,
flowController.getControllerServiceProvider(),
flowController.getFlowManager(), true);
- final VersionedExternalFlow versionedExternalFlow = new
VersionedExternalFlow();
- versionedExternalFlow.setFlowContents(versionedGroup);
- versionedExternalFlow.setExternalControllerServices(Map.of());
- versionedExternalFlow.setParameterProviders(Map.of());
- versionedExternalFlow.setParameterContexts(Map.of());
- destinationGroup.updateFlow(versionedExternalFlow, componentIdSeed,
false, true, true);
+ final String contextName = sourceGroup.getParameterContext().getName();
+
+ final VersionedExternalFlow externalFlowWithoutParameterContext = new
VersionedExternalFlow();
+ externalFlowWithoutParameterContext.setFlowContents(versionedGroup);
+ externalFlowWithoutParameterContext.setParameterContexts(Map.of());
final String duplicateContextId =
UUID.nameUUIDFromBytes((destinationGroup.getIdentifier() +
"-param-context").getBytes(StandardCharsets.UTF_8)).toString();
final ParameterContext sourceContext =
sourceGroup.getParameterContext();
- if (sourceContext != null) {
- final ParameterContext duplicateParameterContext =
flowController.getFlowManager().duplicateParameterContext(duplicateContextId,
sourceContext);
- destinationGroup.setParameterContext(duplicateParameterContext);
+ final ParameterContext duplicateParameterContext =
flowController.getFlowManager().createEmptyParameterContext(
+ duplicateContextId, contextName, sourceContext.getDescription(),
destinationGroup);
+
+ destinationGroup.setParameterContext(duplicateParameterContext);
+ destinationGroup.updateFlow(externalFlowWithoutParameterContext,
componentIdSeed, false, true, true);
+
+ final ParameterContextFacade contextFacade = new
StandaloneParameterContextFacade(flowController, destinationGroup);
+ final List<ParameterValue> parameterValues =
createParameterValues(sourceContext);
+ contextFacade.updateParameters(parameterValues);
+ }
+
+ private List<ParameterValue> createParameterValues(final ParameterContext
context) {
+ final List<ParameterValue> parameterValues = new ArrayList<>();
+ for (final Parameter parameter : context.getParameters().values()) {
+ final ParameterValue.Builder parameterValueBuilder = new
ParameterValue.Builder()
+ .name(parameter.getDescriptor().getName())
+ .sensitive(parameter.getDescriptor().isSensitive())
+ .value(parameter.getValue());
+
+
parameter.getReferencedAssets().forEach(parameterValueBuilder::addReferencedAsset);
+ parameterValues.add(parameterValueBuilder.build());
}
+
+ return parameterValues;
}
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
index 3826c9a371..3f5618cf53 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java
@@ -88,6 +88,8 @@ import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterProvider;
+import org.apache.nifi.parameter.ParameterReferenceManager;
+import org.apache.nifi.parameter.StandardParameterReferenceManager;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
import org.apache.nifi.registry.flow.mapping.ComponentIdLookup;
@@ -754,8 +756,9 @@ public class StandardFlowManager extends
AbstractFlowManager implements FlowMana
final String paramContextId = UUID.nameUUIDFromBytes((id +
"-parameter-context").getBytes(StandardCharsets.UTF_8)).toString();
final String paramContextName = "Connector " + id + " Parameter
Context";
final String parameterContextDescription = "Implicit Parameter Context
for Connector " + id;
+ final ParameterReferenceManager referenceManager = new
StandardParameterReferenceManager(() -> managedRootGroup);
final ParameterContext managedParameterContext =
createParameterContext(paramContextId, paramContextName,
- parameterContextDescription, Collections.emptyMap(),
Collections.emptyList(), null, false);
+ parameterContextDescription, Collections.emptyMap(),
Collections.emptyList(), null, referenceManager, false);
managedRootGroup.setParameterContext(managedParameterContext);
final ConnectorRepository connectorRepository =
flowController.getConnectorRepository();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/OnPropertyModifiedConnector.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/OnPropertyModifiedConnector.java
new file mode 100644
index 0000000000..bf5f242a93
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/OnPropertyModifiedConnector.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.components.connector.util.VersionedFlowUtils;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A test connector that creates a flow with an OnPropertyModifiedTracker
processor.
+ * The processor's Configured Number property is bound to a parameter,
allowing this connector
+ * to test that onPropertyModified is called when a Connector's applyUpdate
changes a parameter value.
+ */
+public class OnPropertyModifiedConnector extends AbstractConnector {
+
+ private static final String PARAMETER_NAME = "CONFIGURED_NUMBER";
+
+ static final ConnectorPropertyDescriptor NUMBER_VALUE = new
ConnectorPropertyDescriptor.Builder()
+ .name("Number Value")
+ .description("The number value to set for the Configured Number
parameter")
+ .type(PropertyType.STRING)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .required(true)
+ .build();
+
+ private static final ConnectorPropertyGroup PROPERTY_GROUP = new
ConnectorPropertyGroup.Builder()
+ .name("Configuration")
+ .description("Configuration properties for the OnPropertyModified
test")
+ .addProperty(NUMBER_VALUE)
+ .build();
+
+ private static final ConfigurationStep CONFIG_STEP = new
ConfigurationStep.Builder()
+ .name("Configuration")
+ .description("Configure the number value for testing
onPropertyModified")
+ .propertyGroups(List.of(PROPERTY_GROUP))
+ .build();
+
+ @Override
+ public VersionedExternalFlow getInitialFlow() {
+ return
VersionedFlowUtils.loadFlowFromResource("flows/on-property-modified-tracker.json");
+ }
+
+ @Override
+ public List<ConfigurationStep> getConfigurationSteps() {
+ return List.of(CONFIG_STEP);
+ }
+
+ @Override
+ protected void onStepConfigured(final String stepName, final FlowContext
workingContext) throws FlowUpdateException {
+ final VersionedExternalFlow versionedExternalFlow = getInitialFlow();
+ final String number =
workingContext.getConfigurationContext().getProperty(CONFIG_STEP,
NUMBER_VALUE).getValue();
+ VersionedFlowUtils.setParameterValue(versionedExternalFlow,
PARAMETER_NAME, number);
+ getInitializationContext().updateFlow(workingContext,
versionedExternalFlow);
+ }
+
+ @Override
+ public void applyUpdate(final FlowContext workingContext, final
FlowContext activeContext) throws FlowUpdateException {
+ final VersionedExternalFlow versionedExternalFlow = getInitialFlow();
+ final String number =
workingContext.getConfigurationContext().getProperty(CONFIG_STEP,
NUMBER_VALUE).getValue();
+ VersionedFlowUtils.setParameterValue(versionedExternalFlow,
PARAMETER_NAME, number);
+ getInitializationContext().updateFlow(activeContext,
versionedExternalFlow);
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verifyConfigurationStep(final String
stepName, final Map<String, String> overrides, final FlowContext
workingContext) {
+ return List.of();
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
index 336d5a20fb..53d7337248 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/StandardConnectorNodeIT.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.connector.processors.CreateDummyFlowFile;
import org.apache.nifi.components.connector.processors.DuplicateFlowFile;
import org.apache.nifi.components.connector.processors.LogFlowFileContents;
+import
org.apache.nifi.components.connector.processors.OnPropertyModifiedTracker;
import org.apache.nifi.components.connector.processors.OverwriteFlowFile;
import org.apache.nifi.components.connector.processors.TerminateFlowFile;
import
org.apache.nifi.components.connector.secrets.ParameterProviderSecretsManager;
@@ -79,6 +80,7 @@ import org.apache.nifi.validation.RuleViolationsManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.io.File;
import java.io.IOException;
@@ -318,6 +320,51 @@ public class StandardConnectorNodeIT {
assertEquals("Hi.",
parameterContext.getParameter("Text").orElseThrow().getValue());
}
+ @Test
+ @Timeout(10)
+ public void testOnPropertyModifiedCalledOnApplyUpdate() throws
FlowUpdateException {
+ final ConnectorNode connectorNode =
flowManager.createConnector(OnPropertyModifiedConnector.class.getName(),
+ "on-property-modified-connector",
SystemBundle.SYSTEM_BUNDLE_COORDINATE, true, true);
+ assertNotNull(connectorNode);
+
+ final StepConfiguration initialConfig = new
StepConfiguration(Map.of("Number Value", new StringLiteralValue("0")));
+ final NamedStepConfiguration initialStepConfig = new
NamedStepConfiguration("Configuration", initialConfig);
+ configure(connectorNode, new
ConnectorConfiguration(Set.of(initialStepConfig)));
+
+ final ProcessGroup activeGroup =
connectorNode.getActiveFlowContext().getManagedProcessGroup();
+ final ProcessorNode processorNode =
activeGroup.getProcessors().iterator().next();
+ final OnPropertyModifiedTracker tracker = (OnPropertyModifiedTracker)
processorNode.getProcessor();
+
+ assertEquals(0, tracker.getPropertyChangeCount());
+
+ final StepConfiguration updatedConfig = new
StepConfiguration(Map.of("Number Value", new StringLiteralValue("1")));
+ final NamedStepConfiguration updatedStepConfig = new
NamedStepConfiguration("Configuration", updatedConfig);
+
+ connectorNode.setConfiguration("Configuration",
updatedStepConfig.configuration());
+
+ final ProcessGroup workingGroup =
connectorNode.getWorkingFlowContext().getManagedProcessGroup();
+ final ProcessorNode workingProcessorNode =
workingGroup.getProcessors().iterator().next();
+ final OnPropertyModifiedTracker workingTracker =
(OnPropertyModifiedTracker) workingProcessorNode.getProcessor();
+
+ assertEquals(1, workingTracker.getPropertyChangeCount());
+ assertEquals("0",
workingTracker.getPropertyChanges().getFirst().oldValue());
+ assertEquals("1",
workingTracker.getPropertyChanges().getFirst().newValue());
+
+ workingTracker.clearPropertyChanges();
+
+ connectorNode.transitionStateForUpdating();
+ connectorNode.prepareForUpdate();
+ connectorNode.applyUpdate();
+
+ assertEquals(1, tracker.getPropertyChangeCount());
+ assertEquals("0", tracker.getPropertyChanges().getFirst().oldValue());
+ assertEquals("1", tracker.getPropertyChanges().getFirst().newValue());
+
+ // Ensure that no parameter contexts are registered
+ final Set<ParameterContext> registeredContexts =
flowManager.getParameterContextManager().getParameterContexts();
+ assertEquals(Set.of(), registeredContexts);
+ }
+
@Test
public void testControllerServices() throws FlowUpdateException {
final ConnectorNode connectorNode = initializeDynamicFlowConnector();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
index cdf73676c9..b552447eae 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
@@ -17,16 +17,22 @@
package org.apache.nifi.components.connector;
+import org.apache.nifi.asset.Asset;
+import org.apache.nifi.asset.AssetManager;
import org.apache.nifi.nar.ExtensionManager;
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestStandardConnectorRepository {
@@ -119,4 +125,70 @@ public class TestStandardConnectorRepository {
assertEquals(1, connectors.size());
assertEquals(connector2, repository.getConnector("same-id"));
}
+
+ @Test
+ public void testDiscardWorkingConfigurationPreservesWorkingAssets() {
+ final String connectorId = "test-connector";
+ final String activeAssetId = "active-asset-id";
+ final String workingAssetId = "working-asset-id";
+ final String unreferencedAssetId = "unreferenced-asset-id";
+
+ final StandardConnectorRepository repository = new
StandardConnectorRepository();
+
+ final AssetManager assetManager = mock(AssetManager.class);
+ final ConnectorRepositoryInitializationContext initContext =
mock(ConnectorRepositoryInitializationContext.class);
+
when(initContext.getExtensionManager()).thenReturn(mock(ExtensionManager.class));
+ when(initContext.getAssetManager()).thenReturn(assetManager);
+ repository.initialize(initContext);
+
+ final Asset activeAsset = mock(Asset.class);
+ when(activeAsset.getIdentifier()).thenReturn(activeAssetId);
+ when(activeAsset.getName()).thenReturn("active-asset.jar");
+
+ final Asset workingAsset = mock(Asset.class);
+ when(workingAsset.getIdentifier()).thenReturn(workingAssetId);
+ when(workingAsset.getName()).thenReturn("working-asset.jar");
+
+ final Asset unreferencedAsset = mock(Asset.class);
+
when(unreferencedAsset.getIdentifier()).thenReturn(unreferencedAssetId);
+ when(unreferencedAsset.getName()).thenReturn("unreferenced-asset.jar");
+
+ final MutableConnectorConfigurationContext activeConfigContext =
mock(MutableConnectorConfigurationContext.class);
+ final StepConfiguration activeStepConfig = new StepConfiguration(
+ Map.of("prop1", new AssetReference(Set.of(activeAssetId)))
+ );
+ final ConnectorConfiguration activeConfig = new ConnectorConfiguration(
+ Set.of(new NamedStepConfiguration("step1", activeStepConfig))
+ );
+
when(activeConfigContext.toConnectorConfiguration()).thenReturn(activeConfig);
+
+ final MutableConnectorConfigurationContext workingConfigContext =
mock(MutableConnectorConfigurationContext.class);
+ final StepConfiguration workingStepConfig = new StepConfiguration(
+ Map.of("prop1", new AssetReference(Set.of(workingAssetId)))
+ );
+ final ConnectorConfiguration workingConfig = new
ConnectorConfiguration(
+ Set.of(new NamedStepConfiguration("step1", workingStepConfig))
+ );
+
when(workingConfigContext.toConnectorConfiguration()).thenReturn(workingConfig);
+
+ final FrameworkFlowContext activeFlowContext =
mock(FrameworkFlowContext.class);
+
when(activeFlowContext.getConfigurationContext()).thenReturn(activeConfigContext);
+
+ final FrameworkFlowContext workingFlowContext =
mock(FrameworkFlowContext.class);
+
when(workingFlowContext.getConfigurationContext()).thenReturn(workingConfigContext);
+
+ final ConnectorNode connector = mock(ConnectorNode.class);
+ when(connector.getIdentifier()).thenReturn(connectorId);
+ when(connector.getActiveFlowContext()).thenReturn(activeFlowContext);
+ when(connector.getWorkingFlowContext()).thenReturn(workingFlowContext);
+
+
when(assetManager.getAssets(connectorId)).thenReturn(List.of(activeAsset,
workingAsset, unreferencedAsset));
+
+ repository.addConnector(connector);
+ repository.discardWorkingConfiguration(connector);
+
+ verify(assetManager, never()).deleteAsset(activeAssetId);
+ verify(assetManager, never()).deleteAsset(workingAssetId);
+ verify(assetManager).deleteAsset(unreferencedAssetId);
+ }
}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/processors/OnPropertyModifiedTracker.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/processors/OnPropertyModifiedTracker.java
new file mode 100644
index 0000000000..ad8de1eda6
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/processors/OnPropertyModifiedTracker.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector.processors;
+
+import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A test processor that tracks calls to onPropertyModified when the
Configured Number property changes.
+ * This is used to verify that onPropertyModified is called correctly when a
Connector's applyUpdate
+ * changes a parameter value.
+ */
+public class OnPropertyModifiedTracker extends AbstractProcessor {
+
+ static final PropertyDescriptor CONFIGURED_NUMBER = new
PropertyDescriptor.Builder()
+ .name("Configured Number")
+ .displayName("Configured Number")
+ .description("A number property that is tracked for changes via
onPropertyModified")
+ .required(false)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles are routed here")
+ .build();
+
+ private volatile boolean configurationRestored = false;
+ private final List<PropertyChange> propertyChanges =
Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return List.of(CONFIGURED_NUMBER);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Set.of(REL_SUCCESS);
+ }
+
+ @OnConfigurationRestored
+ public void onConfigurationRestored() {
+ this.configurationRestored = true;
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ if (!CONFIGURED_NUMBER.getName().equals(descriptor.getName())) {
+ return;
+ }
+
+ if (!configurationRestored) {
+ return;
+ }
+
+ if (oldValue == null) {
+ getLogger().info("Property [{}] initialized to [{}]",
descriptor.getName(), newValue);
+ return;
+ }
+
+ getLogger().info("Property [{}] changed from [{}] to [{}]",
descriptor.getName(), oldValue, newValue);
+ propertyChanges.add(new PropertyChange(oldValue, newValue));
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+
+ public List<PropertyChange> getPropertyChanges() {
+ return new ArrayList<>(propertyChanges);
+ }
+
+ public int getPropertyChangeCount() {
+ return propertyChanges.size();
+ }
+
+ public void clearPropertyChanges() {
+ propertyChanges.clear();
+ }
+
+ public record PropertyChange(String oldValue, String newValue) {
+ }
+}
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
index 3d616e4332..faf406e262 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java
@@ -20,6 +20,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableFlowFileActivity;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.BackoffMechanism;
@@ -209,7 +210,7 @@ public class StandardProcessSessionIT {
}).when(connectable).getConnections(Mockito.any(Relationship.class));
when(connectable.getConnections()).thenReturn(new HashSet<>(connList));
-
+ when(connectable.getFlowFileActivity()).thenReturn(new
ConnectableFlowFileActivity());
contentRepo = new MockContentRepository();
contentRepo.initialize(new StandardContentRepositoryContext(new
StandardResourceClaimManager(), EventReporter.NO_OP));
flowFileRepo = new MockFlowFileRepository(contentRepo);
@@ -2962,6 +2963,7 @@ public class StandardProcessSessionIT {
when(connectable.getIdentifier()).thenReturn("connectable-1");
when(connectable.getConnectableType()).thenReturn(ConnectableType.PROCESSOR);
when(connectable.getComponentType()).thenReturn("Unit Test Component");
+ when(connectable.getFlowFileActivity()).thenReturn(new
ConnectableFlowFileActivity());
Mockito.doAnswer((Answer<Set<Connection>>) invocation -> {
final Object[] arguments = invocation.getArguments();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 8dcba71418..6beb29d409 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -720,7 +720,7 @@ public class MockProcessGroup implements ProcessGroup {
}
@Override
- public void updateFlow(VersionedExternalFlow proposedFlow, String
componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean
updateDescendantVerisonedFlows) {
+ public void updateFlow(VersionedExternalFlow proposedFlow, String
componentIdSeed, boolean verifyNotDirty, boolean updateSettings, boolean
updateDescendantVersionedFlows) {
}
@Override
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flows/on-property-modified-tracker.json
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flows/on-property-modified-tracker.json
new file mode 100644
index 0000000000..1f4f11c0fb
--- /dev/null
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/flows/on-property-modified-tracker.json
@@ -0,0 +1,98 @@
+{
+ "flowContents": {
+ "identifier": "on-property-modified-root",
+ "instanceIdentifier": "on-property-modified-instance",
+ "name": "OnPropertyModified Test Flow",
+ "comments": "",
+ "position": {
+ "x": 0.0,
+ "y": 0.0
+ },
+ "processGroups": [],
+ "remoteProcessGroups": [],
+ "processors": [
+ {
+ "identifier": "on-property-modified-tracker",
+ "instanceIdentifier": "on-property-modified-tracker-instance",
+ "name": "OnPropertyModifiedTracker",
+ "comments": "",
+ "position": {
+ "x": 0.0,
+ "y": 0.0
+ },
+ "type":
"org.apache.nifi.components.connector.processors.OnPropertyModifiedTracker",
+ "bundle": {
+ "group": "default",
+ "artifact": "system",
+ "version": "unversioned"
+ },
+ "properties": {
+ "Configured Number": "#{CONFIGURED_NUMBER}"
+ },
+ "propertyDescriptors": {
+ "Configured Number": {
+ "name": "Configured Number",
+ "displayName": "Configured Number",
+ "identifiesControllerService": false,
+ "sensitive": false,
+ "dynamic": false
+ }
+ },
+ "style": {},
+ "schedulingPeriod": "1 min",
+ "schedulingStrategy": "TIMER_DRIVEN",
+ "executionNode": "ALL",
+ "penaltyDuration": "30 sec",
+ "yieldDuration": "1 sec",
+ "bulletinLevel": "WARN",
+ "runDurationMillis": 0,
+ "concurrentlySchedulableTaskCount": 1,
+ "autoTerminatedRelationships": ["success"],
+ "scheduledState": "ENABLED",
+ "retryCount": 10,
+ "retriedRelationships": [],
+ "backoffMechanism": "PENALIZE_FLOWFILE",
+ "maxBackoffPeriod": "10 mins",
+ "componentType": "PROCESSOR",
+ "groupIdentifier": "on-property-modified-root"
+ }
+ ],
+ "inputPorts": [],
+ "outputPorts": [],
+ "connections": [],
+ "labels": [],
+ "funnels": [],
+ "controllerServices": [],
+ "parameterContextName": "OnPropertyModifiedContext",
+ "defaultFlowFileExpiration": "0 sec",
+ "defaultBackPressureObjectThreshold": 10000,
+ "defaultBackPressureDataSizeThreshold": "1 GB",
+ "scheduledState": "ENABLED",
+ "executionEngine": "INHERITED",
+ "maxConcurrentTasks": 1,
+ "statelessFlowTimeout": "1 min",
+ "flowFileConcurrency": "UNBOUNDED",
+ "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
+ "componentType": "PROCESS_GROUP"
+ },
+ "externalControllerServices": {},
+ "parameterContexts": {
+ "OnPropertyModifiedContext": {
+ "name": "OnPropertyModifiedContext",
+ "parameters": [
+ {
+ "name": "CONFIGURED_NUMBER",
+ "description": "",
+ "sensitive": false,
+ "provided": false,
+ "value": null
+ }
+ ],
+ "inheritedParameterContexts": [],
+ "componentType": "PARAMETER_CONTEXT"
+ }
+ },
+ "flowEncodingVersion": "1.0",
+ "parameterProviders": {},
+ "latest": false
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java
new file mode 100644
index 0000000000..8ba055dda9
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.connectors.tests.system;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.connector.AbstractConnector;
+import org.apache.nifi.components.connector.BundleCompatibility;
+import org.apache.nifi.components.connector.ConfigurationStep;
+import org.apache.nifi.components.connector.ConnectorPropertyDescriptor;
+import org.apache.nifi.components.connector.ConnectorPropertyGroup;
+import org.apache.nifi.components.connector.FlowUpdateException;
+import org.apache.nifi.components.connector.PropertyType;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.ConnectableComponentType;
+import org.apache.nifi.flow.PortType;
+import org.apache.nifi.flow.Position;
+import org.apache.nifi.flow.ScheduledState;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedExternalFlow;
+import org.apache.nifi.flow.VersionedParameter;
+import org.apache.nifi.flow.VersionedParameterContext;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A test connector that creates a flow with nested Process Groups and
Parameter Context inheritance.
+ * Used to verify that sensitive parameters and asset-referencing parameters
work correctly
+ * when inherited from child Parameter Contexts.
+ *
+ * The flow structure:
+ * - Root Group (bound to Parent Parameter Context)
+ * - GenerateFlowFile (generates 1 FlowFile)
+ * - Process Group A (bound to Parent Parameter Context)
+ * - Input Port
+ * - UpdateContent (Sensitive Content = #{sensitive_param})
+ * - WriteToFile (target/sensitive.txt)
+ * - Process Group B (bound to Parent Parameter Context)
+ * - Input Port
+ * - ReplaceWithFile (Filename = #{asset_param})
+ * - WriteToFile (target/asset.txt)
+ *
+ * Parameter Context hierarchy:
+ * - Parent Parameter Context (inherits from Child Context A and Child Context
B)
+ * - Child Context A: sensitive_param (sensitive)
+ * - Child Context B: asset_param (asset reference)
+ */
+public class ParameterContextConnector extends AbstractConnector {
+
+ private static final String CONFIGURATION_STEP_NAME = "Parameter Context
Configuration";
+
+ private static final String ROOT_GROUP_ID = "root-group";
+ private static final String GENERATE_PROCESSOR_ID = "generate-flowfile";
+ private static final String GROUP_A_ID = "process-group-a";
+ private static final String GROUP_B_ID = "process-group-b";
+ private static final String INPUT_PORT_A_ID = "input-port-a";
+ private static final String INPUT_PORT_B_ID = "input-port-b";
+ private static final String UPDATE_CONTENT_ID = "update-content";
+ private static final String REPLACE_WITH_FILE_ID = "replace-with-file";
+ private static final String WRITE_SENSITIVE_ID = "write-sensitive";
+ private static final String WRITE_ASSET_ID = "write-asset";
+
+ private static final String PARENT_CONTEXT_NAME = "Parent Parameter
Context";
+ private static final String CHILD_CONTEXT_A_NAME = "Child Context A";
+ private static final String CHILD_CONTEXT_B_NAME = "Child Context B";
+
+ private static final String SENSITIVE_PARAM_NAME = "sensitive_param";
+ private static final String ASSET_PARAM_NAME = "asset_param";
+
+ static final ConnectorPropertyDescriptor SENSITIVE_VALUE = new
ConnectorPropertyDescriptor.Builder()
+ .name("Sensitive Value")
+ .description("The sensitive value to be stored in the sensitive
parameter")
+ .required(true)
+ .type(PropertyType.SECRET)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final ConnectorPropertyDescriptor ASSET_FILE = new
ConnectorPropertyDescriptor.Builder()
+ .name("Asset File")
+ .description("The asset file whose contents will be used via the
asset parameter")
+ .required(true)
+ .type(PropertyType.ASSET)
+ .build();
+
+ static final ConnectorPropertyDescriptor SENSITIVE_OUTPUT_FILE = new
ConnectorPropertyDescriptor.Builder()
+ .name("Sensitive Output File")
+ .description("The file path where the sensitive value output will
be written")
+ .required(true)
+ .type(PropertyType.STRING)
+ .defaultValue("target/sensitive.txt")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final ConnectorPropertyDescriptor ASSET_OUTPUT_FILE = new
ConnectorPropertyDescriptor.Builder()
+ .name("Asset Output File")
+ .description("The file path where the asset contents output will
be written")
+ .required(true)
+ .type(PropertyType.STRING)
+ .defaultValue("target/asset.txt")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private static final ConnectorPropertyGroup PROPERTY_GROUP = new
ConnectorPropertyGroup.Builder()
+ .name("Parameter Context Configuration")
+ .description("Configuration properties for parameter context
testing")
+ .properties(List.of(SENSITIVE_VALUE, ASSET_FILE,
SENSITIVE_OUTPUT_FILE, ASSET_OUTPUT_FILE))
+ .build();
+
+ private static final ConfigurationStep CONFIG_STEP = new
ConfigurationStep.Builder()
+ .name(CONFIGURATION_STEP_NAME)
+ .description("Configure the sensitive value and asset file for
parameter context testing")
+ .propertyGroups(List.of(PROPERTY_GROUP))
+ .build();
+
+ @Override
+ protected void onStepConfigured(final String stepName, final FlowContext
workingContext) throws FlowUpdateException {
+ }
+
+ @Override
+ public VersionedExternalFlow getInitialFlow() {
+ return createEmptyFlow();
+ }
+
+ private VersionedExternalFlow createEmptyFlow() {
+ final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
+ rootGroup.setIdentifier(ROOT_GROUP_ID);
+ rootGroup.setName("Parameter Context Test Flow");
+ rootGroup.setProcessors(new HashSet<>());
+ rootGroup.setProcessGroups(new HashSet<>());
+ rootGroup.setConnections(new HashSet<>());
+ rootGroup.setInputPorts(new HashSet<>());
+ rootGroup.setOutputPorts(new HashSet<>());
+ rootGroup.setControllerServices(new HashSet<>());
+
+ final VersionedExternalFlow flow = new VersionedExternalFlow();
+ flow.setFlowContents(rootGroup);
+ flow.setParameterContexts(Collections.emptyMap());
+ return flow;
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verifyConfigurationStep(final String
stepName, final Map<String, String> propertyValueOverrides, final FlowContext
flowContext) {
+ return List.of();
+ }
+
+ @Override
+ public List<ConfigurationStep> getConfigurationSteps() {
+ return List.of(CONFIG_STEP);
+ }
+
+ @Override
+ public void applyUpdate(final FlowContext workingContext, final
FlowContext activeContext) throws FlowUpdateException {
+ final String sensitiveValue =
workingContext.getConfigurationContext().getProperty(CONFIG_STEP,
SENSITIVE_VALUE).getValue();
+ final String assetFilePath =
workingContext.getConfigurationContext().getProperty(CONFIG_STEP,
ASSET_FILE).getValue();
+ final String sensitiveOutputFile =
workingContext.getConfigurationContext().getProperty(CONFIG_STEP,
SENSITIVE_OUTPUT_FILE).getValue();
+ final String assetOutputFile =
workingContext.getConfigurationContext().getProperty(CONFIG_STEP,
ASSET_OUTPUT_FILE).getValue();
+
+ if (sensitiveValue == null || assetFilePath == null) {
+ return;
+ }
+
+ final VersionedExternalFlow flow = createFlow(sensitiveValue,
assetFilePath, sensitiveOutputFile, assetOutputFile);
+ getInitializationContext().updateFlow(activeContext, flow,
BundleCompatibility.RESOLVE_BUNDLE);
+ }
+
+ private VersionedExternalFlow createFlow(final String sensitiveValue,
final String assetFilePath,
+ final String
sensitiveOutputFile, final String assetOutputFile) {
+ final Bundle bundle = createBundle();
+
+ // Create Parameter Contexts with inheritance
+ final Map<String, VersionedParameterContext> parameterContexts =
createParameterContexts(sensitiveValue, assetFilePath);
+
+ // Create root group
+ final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
+ rootGroup.setIdentifier(ROOT_GROUP_ID);
+ rootGroup.setName("Parameter Context Test Flow");
+ rootGroup.setParameterContextName(PARENT_CONTEXT_NAME);
+
+ // Create GenerateFlowFile at root level
+ final VersionedProcessor generateProcessor =
createProcessor(GENERATE_PROCESSOR_ID, ROOT_GROUP_ID, "GenerateFlowFile",
+ "org.apache.nifi.processors.tests.system.GenerateFlowFile",
bundle,
+ Map.of("Max FlowFiles", "1", "File Size", "0 B"),
ScheduledState.ENABLED);
+ generateProcessor.setSchedulingPeriod("60 sec");
+
+ // Create Process Group A (sensitive value path)
+ final VersionedProcessGroup groupA = createProcessGroupA(bundle,
sensitiveOutputFile);
+
+ // Create Process Group B (asset path)
+ final VersionedProcessGroup groupB = createProcessGroupB(bundle,
assetOutputFile);
+
+ // Create connections from GenerateFlowFile to both process group
input ports
+ final VersionedConnection connectionToA =
createConnection("conn-to-group-a", ROOT_GROUP_ID,
+ GENERATE_PROCESSOR_ID, ConnectableComponentType.PROCESSOR,
+ INPUT_PORT_A_ID, ConnectableComponentType.INPUT_PORT,
GROUP_A_ID,
+ Set.of("success"));
+
+ final VersionedConnection connectionToB =
createConnection("conn-to-group-b", ROOT_GROUP_ID,
+ GENERATE_PROCESSOR_ID, ConnectableComponentType.PROCESSOR,
+ INPUT_PORT_B_ID, ConnectableComponentType.INPUT_PORT,
GROUP_B_ID,
+ Set.of("success"));
+
+ rootGroup.setProcessors(Set.of(generateProcessor));
+ rootGroup.setProcessGroups(Set.of(groupA, groupB));
+ rootGroup.setConnections(Set.of(connectionToA, connectionToB));
+ rootGroup.setInputPorts(new HashSet<>());
+ rootGroup.setOutputPorts(new HashSet<>());
+ rootGroup.setControllerServices(new HashSet<>());
+
+ final VersionedExternalFlow flow = new VersionedExternalFlow();
+ flow.setFlowContents(rootGroup);
+ flow.setParameterContexts(parameterContexts);
+ return flow;
+ }
+
+ private Map<String, VersionedParameterContext>
createParameterContexts(final String sensitiveValue, final String
assetFilePath) {
+ // Child Context A - sensitive parameter
+ final VersionedParameter sensitiveParam = new VersionedParameter();
+ sensitiveParam.setName(SENSITIVE_PARAM_NAME);
+ sensitiveParam.setSensitive(true);
+ sensitiveParam.setValue(sensitiveValue);
+ sensitiveParam.setProvided(false);
+ sensitiveParam.setReferencedAssets(List.of());
+
+ final VersionedParameterContext childContextA = new
VersionedParameterContext();
+ childContextA.setName(CHILD_CONTEXT_A_NAME);
+ childContextA.setParameters(Set.of(sensitiveParam));
+
+ // Child Context B - asset parameter
+ final VersionedParameter assetParam = new VersionedParameter();
+ assetParam.setName(ASSET_PARAM_NAME);
+ assetParam.setSensitive(false);
+ assetParam.setValue(assetFilePath);
+ assetParam.setProvided(false);
+ assetParam.setReferencedAssets(List.of());
+
+ final VersionedParameterContext childContextB = new
VersionedParameterContext();
+ childContextB.setName(CHILD_CONTEXT_B_NAME);
+ childContextB.setParameters(Set.of(assetParam));
+
+ // Parent Context - inherits from both child contexts
+ final VersionedParameterContext parentContext = new
VersionedParameterContext();
+ parentContext.setName(PARENT_CONTEXT_NAME);
+ parentContext.setParameters(Set.of());
+
parentContext.setInheritedParameterContexts(List.of(CHILD_CONTEXT_A_NAME,
CHILD_CONTEXT_B_NAME));
+
+ final Map<String, VersionedParameterContext> contexts = new
HashMap<>();
+ contexts.put(CHILD_CONTEXT_A_NAME, childContextA);
+ contexts.put(CHILD_CONTEXT_B_NAME, childContextB);
+ contexts.put(PARENT_CONTEXT_NAME, parentContext);
+ return contexts;
+ }
+
+ private VersionedProcessGroup createProcessGroupA(final Bundle bundle,
final String outputFile) {
+ final VersionedProcessGroup groupA = new VersionedProcessGroup();
+ groupA.setIdentifier(GROUP_A_ID);
+ groupA.setGroupIdentifier(ROOT_GROUP_ID);
+ groupA.setName("Process Group A - Sensitive Value");
+ groupA.setParameterContextName(PARENT_CONTEXT_NAME);
+
+ // Input Port
+ final VersionedPort inputPortA = createInputPort(INPUT_PORT_A_ID,
GROUP_A_ID, "Input Port A");
+
+ // UpdateContent processor using sensitive parameter
+ final VersionedProcessor updateContent =
createProcessor(UPDATE_CONTENT_ID, GROUP_A_ID, "UpdateContent",
+ "org.apache.nifi.processors.tests.system.UpdateContent",
bundle,
+ Map.of("Sensitive Content", "#{" + SENSITIVE_PARAM_NAME + "}",
"Update Strategy", "Replace"),
+ ScheduledState.ENABLED);
+
+ // WriteToFile processor
+ final VersionedProcessor writeToFile =
createProcessor(WRITE_SENSITIVE_ID, GROUP_A_ID, "WriteToFile",
+ "org.apache.nifi.processors.tests.system.WriteToFile", bundle,
+ Map.of("Filename", outputFile), ScheduledState.ENABLED);
+ writeToFile.setAutoTerminatedRelationships(Set.of("success",
"failure"));
+
+ // Connections within Group A
+ final VersionedConnection inputToUpdate =
createConnection("input-to-update", GROUP_A_ID,
+ INPUT_PORT_A_ID, ConnectableComponentType.INPUT_PORT,
+ UPDATE_CONTENT_ID, ConnectableComponentType.PROCESSOR, null,
+ Set.of());
+
+ final VersionedConnection updateToWrite =
createConnection("update-to-write", GROUP_A_ID,
+ UPDATE_CONTENT_ID, ConnectableComponentType.PROCESSOR,
+ WRITE_SENSITIVE_ID, ConnectableComponentType.PROCESSOR, null,
+ Set.of("success"));
+
+ groupA.setInputPorts(Set.of(inputPortA));
+ groupA.setOutputPorts(new HashSet<>());
+ groupA.setProcessors(Set.of(updateContent, writeToFile));
+ groupA.setConnections(Set.of(inputToUpdate, updateToWrite));
+ groupA.setProcessGroups(new HashSet<>());
+ groupA.setControllerServices(new HashSet<>());
+
+ return groupA;
+ }
+
+ private VersionedProcessGroup createProcessGroupB(final Bundle bundle,
final String outputFile) {
+ final VersionedProcessGroup groupB = new VersionedProcessGroup();
+ groupB.setIdentifier(GROUP_B_ID);
+ groupB.setGroupIdentifier(ROOT_GROUP_ID);
+ groupB.setName("Process Group B - Asset Value");
+ groupB.setParameterContextName(PARENT_CONTEXT_NAME);
+
+ // Input Port
+ final VersionedPort inputPortB = createInputPort(INPUT_PORT_B_ID,
GROUP_B_ID, "Input Port B");
+
+ // ReplaceWithFile processor using asset parameter
+ final VersionedProcessor replaceWithFile =
createProcessor(REPLACE_WITH_FILE_ID, GROUP_B_ID, "ReplaceWithFile",
+ "org.apache.nifi.processors.tests.system.ReplaceWithFile",
bundle,
+ Map.of("Filename", "#{" + ASSET_PARAM_NAME + "}"),
ScheduledState.ENABLED);
+
+ // WriteToFile processor
+ final VersionedProcessor writeToFile = createProcessor(WRITE_ASSET_ID,
GROUP_B_ID, "WriteToFile",
+ "org.apache.nifi.processors.tests.system.WriteToFile", bundle,
+ Map.of("Filename", outputFile), ScheduledState.ENABLED);
+ writeToFile.setAutoTerminatedRelationships(Set.of("success",
"failure"));
+
+ // Connections within Group B
+ final VersionedConnection inputToReplace =
createConnection("input-to-replace", GROUP_B_ID,
+ INPUT_PORT_B_ID, ConnectableComponentType.INPUT_PORT,
+ REPLACE_WITH_FILE_ID, ConnectableComponentType.PROCESSOR, null,
+ Set.of());
+
+ final VersionedConnection replaceToWrite =
createConnection("replace-to-write", GROUP_B_ID,
+ REPLACE_WITH_FILE_ID, ConnectableComponentType.PROCESSOR,
+ WRITE_ASSET_ID, ConnectableComponentType.PROCESSOR, null,
+ Set.of("success"));
+
+ groupB.setInputPorts(Set.of(inputPortB));
+ groupB.setOutputPorts(new HashSet<>());
+ groupB.setProcessors(Set.of(replaceWithFile, writeToFile));
+ groupB.setConnections(Set.of(inputToReplace, replaceToWrite));
+ groupB.setProcessGroups(new HashSet<>());
+ groupB.setControllerServices(new HashSet<>());
+
+ return groupB;
+ }
+
+ private Bundle createBundle() {
+ final Bundle bundle = new Bundle();
+ bundle.setGroup("org.apache.nifi");
+ bundle.setArtifact("nifi-system-test-extensions-nar");
+ bundle.setVersion("2.8.0-SNAPSHOT");
+ return bundle;
+ }
+
+ private VersionedPort createInputPort(final String identifier, final
String groupIdentifier, final String name) {
+ final VersionedPort port = new VersionedPort();
+ port.setIdentifier(identifier);
+ port.setGroupIdentifier(groupIdentifier);
+ port.setName(name);
+ port.setType(PortType.INPUT_PORT);
+ port.setScheduledState(ScheduledState.ENABLED);
+ port.setConcurrentlySchedulableTaskCount(1);
+ port.setPosition(new Position(0, 0));
+ port.setAllowRemoteAccess(false);
+ return port;
+ }
+
+ private VersionedProcessor createProcessor(final String identifier, final
String groupIdentifier, final String name,
+ final String type, final
Bundle bundle, final Map<String, String> properties,
+ final ScheduledState
scheduledState) {
+ final VersionedProcessor processor = new VersionedProcessor();
+ processor.setIdentifier(identifier);
+ processor.setGroupIdentifier(groupIdentifier);
+ processor.setName(name);
+ processor.setType(type);
+ processor.setBundle(bundle);
+ processor.setProperties(properties);
+ processor.setPropertyDescriptors(Collections.emptyMap());
+ processor.setScheduledState(scheduledState);
+
+ processor.setBulletinLevel("WARN");
+ processor.setSchedulingStrategy("TIMER_DRIVEN");
+ processor.setSchedulingPeriod("0 sec");
+ processor.setExecutionNode("ALL");
+ processor.setConcurrentlySchedulableTaskCount(1);
+ processor.setPenaltyDuration("30 sec");
+ processor.setYieldDuration("1 sec");
+ processor.setRunDurationMillis(0L);
+ processor.setPosition(new Position(0, 0));
+
+ processor.setAutoTerminatedRelationships(Collections.emptySet());
+ processor.setRetryCount(10);
+ processor.setRetriedRelationships(Collections.emptySet());
+ processor.setBackoffMechanism("PENALIZE_FLOWFILE");
+ processor.setMaxBackoffPeriod("10 mins");
+
+ return processor;
+ }
+
+ private VersionedConnection createConnection(final String identifier,
final String groupIdentifier,
+ final String sourceId, final
ConnectableComponentType sourceType,
+ final String destinationId,
final ConnectableComponentType destinationType,
+ final String
destinationGroupId,
+ final Set<String>
selectedRelationships) {
+ final ConnectableComponent source = new ConnectableComponent();
+ source.setId(sourceId);
+ source.setType(sourceType);
+ source.setGroupId(groupIdentifier);
+
+ final ConnectableComponent destination = new ConnectableComponent();
+ destination.setId(destinationId);
+ destination.setType(destinationType);
+ destination.setGroupId(destinationGroupId != null ? destinationGroupId
: groupIdentifier);
+
+ final VersionedConnection connection = new VersionedConnection();
+ connection.setIdentifier(identifier);
+ connection.setGroupIdentifier(groupIdentifier);
+ connection.setSource(source);
+ connection.setDestination(destination);
+ connection.setSelectedRelationships(selectedRelationships);
+ connection.setBackPressureDataSizeThreshold("1 GB");
+ connection.setBackPressureObjectThreshold(10_000L);
+ connection.setBends(Collections.emptyList());
+ connection.setLabelIndex(1);
+ connection.setFlowFileExpiration("0 sec");
+ connection.setPrioritizers(Collections.emptyList());
+ connection.setzIndex(0L);
+
+ return connection;
+ }
+}
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateContent.java
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateContent.java
index 2b5da9485d..018b76e5c6 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateContent.java
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/UpdateContent.java
@@ -19,6 +19,8 @@ package org.apache.nifi.processors.tests.system;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@@ -29,7 +31,9 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -40,9 +44,17 @@ public class UpdateContent extends AbstractProcessor {
.name("Content")
.displayName("Content")
.description("Content to set")
- .required(true)
+ .required(false)
+ .addValidator(Validator.VALID)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ static final PropertyDescriptor SENSITIVE_CONTENT = new Builder()
+ .name("Sensitive Content")
+ .displayName("Sensitive Content")
+ .description("Sensitive content to set (use for sensitive parameter
references)")
+ .required(false)
+ .sensitive(true)
.addValidator(Validator.VALID)
- .defaultValue("Default Content")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor UPDATE_STRATEGY = new Builder()
@@ -60,7 +72,23 @@ public class UpdateContent extends AbstractProcessor {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return Arrays.asList(CONTENT, UPDATE_STRATEGY);
+ return Arrays.asList(CONTENT, SENSITIVE_CONTENT, UPDATE_STRATEGY);
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext context) {
+ final List<ValidationResult> results = new ArrayList<>();
+ final boolean hasContent = context.getProperty(CONTENT).isSet();
+ final boolean hasSensitiveContent =
context.getProperty(SENSITIVE_CONTENT).isSet();
+
+ if (hasContent == hasSensitiveContent) {
+ results.add(new ValidationResult.Builder()
+ .subject("Content")
+ .valid(false)
+ .explanation("Exactly one of 'Content' or 'Sensitive Content'
must be set")
+ .build());
+ }
+ return results;
}
@Override
@@ -75,7 +103,12 @@ public class UpdateContent extends AbstractProcessor {
return;
}
- final String content =
context.getProperty(CONTENT).evaluateAttributeExpressions(flowFile).getValue();
+ final String content;
+ if (context.getProperty(CONTENT).isSet()) {
+ content =
context.getProperty(CONTENT).evaluateAttributeExpressions(flowFile).getValue();
+ } else {
+ content =
context.getProperty(SENSITIVE_CONTENT).evaluateAttributeExpressions(flowFile).getValue();
+ }
final byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
final String strategy =
context.getProperty(UPDATE_STRATEGY).getValue();
diff --git
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
index ab00ff7275..d58dae1924 100644
---
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
+++
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.components.connector.Connector
@@ -20,3 +20,4 @@ org.apache.nifi.connectors.tests.system.DataQueuingConnector
org.apache.nifi.connectors.tests.system.GatedDataQueuingConnector
org.apache.nifi.connectors.tests.system.NestedProcessGroupConnector
org.apache.nifi.connectors.tests.system.NopConnector
+org.apache.nifi.connectors.tests.system.ParameterContextConnector
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorParameterContextIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorParameterContextIT.java
new file mode 100644
index 0000000000..40b46eeacb
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorParameterContextIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.ConnectorClient;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.dto.AssetReferenceDTO;
+import org.apache.nifi.web.api.dto.ConnectorValueReferenceDTO;
+import org.apache.nifi.web.api.entity.AssetEntity;
+import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.apache.nifi.web.api.entity.ParameterProviderEntity;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * System test that validates Connectors can use Parameter Contexts with
inheritance,
+ * sensitive parameters (via SECRET_REFERENCE), and asset-referencing
parameters.
+ */
+public class ConnectorParameterContextIT extends NiFiSystemIT {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ConnectorParameterContextIT.class);
+ private static final String SENSITIVE_SECRET_VALUE =
"my-super-secret-value";
+ private static final String ASSET_FILE_CONTENT = "Hello, World!";
+
+ @Test
+ public void
testParameterContextInheritanceWithSensitiveAndAssetParameters() throws
NiFiClientException, IOException, InterruptedException {
+ final File sensitiveOutputFile = new File("target/sensitive.txt");
+ final File assetOutputFile = new File("target/asset.txt");
+
+ sensitiveOutputFile.delete();
+ assetOutputFile.delete();
+
+ final ParameterProviderEntity paramProvider =
getClientUtil().createParameterProvider("PropertiesParameterProvider");
+ getClientUtil().updateParameterProviderProperties(paramProvider,
Map.of("parameters", "secret=" + SENSITIVE_SECRET_VALUE));
+
+ final ConnectorEntity connector =
getClientUtil().createConnector("ParameterContextConnector");
+ assertNotNull(connector);
+ assertNotNull(connector.getId());
+
+ final String connectorId = connector.getId();
+ final ConnectorClient connectorClient =
getNifiClient().getConnectorClient();
+
+ final File assetFile = new
File("src/test/resources/sample-assets/helloworld.txt");
+ final AssetEntity assetEntity =
connectorClient.createAsset(connectorId, assetFile.getName(), assetFile);
+ assertNotNull(assetEntity);
+ assertNotNull(assetEntity.getAsset());
+ assertNotNull(assetEntity.getAsset().getId());
+
+ final String uploadedAssetId = assetEntity.getAsset().getId();
+
+ final ConnectorValueReferenceDTO secretRef =
getClientUtil().createSecretValueReference(
+ paramProvider.getId(), "secret",
"PropertiesParameterProvider.Parameters.secret");
+
+ final ConnectorValueReferenceDTO assetRef = new
ConnectorValueReferenceDTO();
+ assetRef.setValueType("ASSET_REFERENCE");
+ assetRef.setAssetReferences(List.of(new
AssetReferenceDTO(uploadedAssetId)));
+
+ final Map<String, ConnectorValueReferenceDTO> propertyValues = new
HashMap<>();
+ propertyValues.put("Sensitive Value", secretRef);
+ propertyValues.put("Asset File", assetRef);
+ propertyValues.put("Sensitive Output File",
createStringLiteralRef(sensitiveOutputFile.getAbsolutePath()));
+ propertyValues.put("Asset Output File",
createStringLiteralRef(assetOutputFile.getAbsolutePath()));
+
+ getClientUtil().configureConnectorWithReferences(connectorId,
"Parameter Context Configuration", propertyValues);
+ LOGGER.info("Applying configuration to Connector...");
+ getClientUtil().applyConnectorUpdate(connector);
+ LOGGER.info("Waiting for Connector to become valid...");
+ getClientUtil().waitForValidConnector(connectorId);
+
+ LOGGER.info("Connector is valid; starting Connector...");
+ getClientUtil().startConnector(connectorId);
+
+ LOGGER.info("Waiting for output files to be created...");
+ waitFor(() -> sensitiveOutputFile.exists() &&
assetOutputFile.exists());
+
+ assertTrue(sensitiveOutputFile.exists(), "Sensitive output file should
exist");
+ assertTrue(assetOutputFile.exists(), "Asset output file should exist");
+
+ final String sensitiveContent =
Files.readString(sensitiveOutputFile.toPath()).trim();
+ final String assetContent =
Files.readString(assetOutputFile.toPath()).trim();
+
+ assertEquals(SENSITIVE_SECRET_VALUE, sensitiveContent, "Sensitive
output file should contain the secret value");
+ assertEquals(ASSET_FILE_CONTENT, assetContent, "Asset output file
should contain the asset file contents");
+ }
+
+ private ConnectorValueReferenceDTO createStringLiteralRef(final String
value) {
+ final ConnectorValueReferenceDTO ref = new
ConnectorValueReferenceDTO();
+ ref.setValueType("STRING_LITERAL");
+ ref.setValue(value);
+ return ref;
+ }
+}