This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 8245bc3f804f701da52a4e64a000b04aff718948 Author: Mark Payne <[email protected]> AuthorDate: Tue May 21 11:03:21 2019 -0400 NIFI-5922: Ensure that we import any default variable values on flow import --- .../StatelessControllerServiceInitializationContext.java | 3 ++- .../org/apache/nifi/stateless/core/StatelessFlow.java | 16 ++++++++++++---- .../nifi/stateless/core/StatelessValidationContext.java | 3 ++- .../java/org/apache/nifi/stateless/runtimes/Program.java | 6 ++++-- .../runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java | 9 ++++++++- .../java/org/apache/nifi/stateless/core/BatchTest.java | 3 ++- 6 files changed, 30 insertions(+), 10 deletions(-) diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java index b0801f0..70f4ca7 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java @@ -77,4 +77,5 @@ public class StatelessControllerServiceInitializationContext implements Controll public File getKerberosConfigurationFile() { return null; //this needs to be wired in. - }} + } +} diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java index 2b17376..0418429 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java @@ -403,18 +403,26 @@ public class StatelessFlow implements RunnableFlow { args.getAsJsonArray(FAILUREPORTS).forEach(port ->failurePorts.add(port.getAsString())); } + final SSLContext sslContext = getSSLContext(args); + final VersionedFlowSnapshot snapshot = new RegistryUtil(registryurl, sslContext).getFlowByID(bucketID, flowID, flowVersion); + final Map<VariableDescriptor, String> inputVariables = new HashMap<>(); + final VersionedProcessGroup versionedGroup = snapshot.getFlowContents(); + if (versionedGroup != null) { + for (final Map.Entry<String, String> entry : versionedGroup.getVariables().entrySet()) { + final String variableName = entry.getKey(); + final String variableValue = entry.getValue(); + inputVariables.put(new VariableDescriptor(variableName), variableValue); + } + } if (args.has(VARIABLES)) { final JsonElement variablesElement = args.get(VARIABLES); final JsonObject variablesObject = variablesElement.getAsJsonObject(); variablesObject.entrySet() - .forEach(entry ->inputVariables.put(new VariableDescriptor(entry.getKey()), entry.getValue().getAsString())); + .forEach(entry -> inputVariables.put(new VariableDescriptor(entry.getKey()), entry.getValue().getAsString())); } - final SSLContext sslContext = getSSLContext(args); - - final VersionedFlowSnapshot snapshot = new RegistryUtil(registryurl, sslContext).getFlowByID(bucketID, flowID, flowVersion); final ExtensionManager extensionManager = ExtensionDiscovery.discover(narWorkingDir, systemClassLoader); final StatelessFlow flow = new StatelessFlow(snapshot.getFlowContents(), extensionManager, () -> inputVariables, failurePorts, materializeContent, sslContext); diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessValidationContext.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessValidationContext.java index 6202109..6dcb610 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessValidationContext.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessValidationContext.java @@ -40,7 +40,8 @@ public class StatelessValidationContext implements ValidationContext { private final VariableRegistry variableRegistry; private final StatelessProcessContext processContext; - public StatelessValidationContext(final StatelessProcessContext processContext, final StatelessControllerServiceLookup lookup, final StateManager stateManager, final VariableRegistry variableRegistry) { + public StatelessValidationContext(final StatelessProcessContext processContext, final StatelessControllerServiceLookup lookup, final StateManager stateManager, + final VariableRegistry variableRegistry) { this.processContext = processContext; this.lookup = lookup; this.stateManager = stateManager; diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java index e26d0a4..100aad3 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/Program.java @@ -30,7 +30,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.*; +import java.util.LinkedList; +import java.util.Queue; public class Program { @@ -168,7 +169,8 @@ public class Program { System.out.println(); System.out.println("Notes:"); System.out.println(" 1) The configuration file must be in JSON format. "); - System.out.println(" 2) When providing configurations via JSON, the following attributes must be provided: " + StatelessFlow.REGISTRY + ", " + StatelessFlow.BUCKETID + ", " + StatelessFlow.FLOWID + "."); + System.out.println(" 2) When providing configurations via JSON, the following attributes must be provided: " + StatelessFlow.REGISTRY + ", " + StatelessFlow.BUCKETID + + ", " + StatelessFlow.FLOWID + "."); System.out.println(" All other attributes will be passed to the flow using the variable registry interface"); System.out.println(); } diff --git a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java index 849ce13..7927d38 100644 --- a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java +++ b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/runtimes/openwhisk/StatelessNiFiOpenWhiskAction.java @@ -25,7 +25,14 @@ import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile; import org.apache.nifi.stateless.bootstrap.RunnableFlow; import org.apache.nifi.stateless.core.StatelessFlow; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.StringWriter; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.LinkedList; diff --git a/nifi-stateless/nifi-stateless-core/src/test/java/org/apache/nifi/stateless/core/BatchTest.java b/nifi-stateless/nifi-stateless-core/src/test/java/org/apache/nifi/stateless/core/BatchTest.java index 286bb2c..15a0f00 100644 --- a/nifi-stateless/nifi-stateless-core/src/test/java/org/apache/nifi/stateless/core/BatchTest.java +++ b/nifi-stateless/nifi-stateless-core/src/test/java/org/apache/nifi/stateless/core/BatchTest.java @@ -60,7 +60,8 @@ public class BatchTest { /////////////////////////////////////////// // Build Flow /////////////////////////////////////////// - StatelessProcessorWrapper getFile = new StatelessProcessorWrapper(UUID.randomUUID().toString(), new GetFile(), null, serviceLookup, registry, materializeData, ClassLoader.getSystemClassLoader()); + StatelessProcessorWrapper getFile = new StatelessProcessorWrapper(UUID.randomUUID().toString(), new GetFile(), null, serviceLookup, registry, + materializeData, ClassLoader.getSystemClassLoader()); getFile.setProperty(GetFile.DIRECTORY,"/tmp/nifistateless/input/"); getFile.setProperty(GetFile.FILE_FILTER,"test.txt"); getFile.setProperty(GetFile.KEEP_SOURCE_FILE,"true");
