This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b799879  NIFI-8380: Allow for an extensions.directory property to 
specify where to place downloaded files. Also fixed an issue that was 
encountered, when a Source Processor is scheduled for Primary Node Only but 
more than 1 task is set. In that case, even though only a single task will 
should be scheduled, an Exception was getting thrown because @OnScheduled 
methods of Processors were still called. To avoid this, moved the 
initialization of the dataflow outside of the creation of [...]
b799879 is described below

commit b79987918a3f8e3cb69e3d21176f09704c441707
Author: Mark Payne <[email protected]>
AuthorDate: Mon Mar 29 12:23:08 2021 -0400

    NIFI-8380: Allow for an extensions.directory property to specify where to 
place downloaded files. Also fixed an issue that was encountered, when a Source 
Processor is scheduled for Primary Node Only but more than 1 task is set. In 
that case, even though only a single task will should be scheduled, an 
Exception was getting thrown because @OnScheduled methods of Processors were 
still called. To avoid this, moved the initialization of the dataflow outside 
of the creation of the dataflow  [...]
    
    NIFI-8380: Removed requirement in validation for working directory and 
extensions directory to exist; removed auto-creation of directories in 
validation
    
    NIFI-8380: Fixed a few thrading bugs, so that if we have multiple threads 
trying to download/unpack extensions, we properly synchronize the unpacking and 
unpack into the correct sub-directory under the working directory
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4950.
---
 nifi-external/nifi-kafka-connect/README.md         | 22 ++++--
 .../kafka/connect/StatelessKafkaConnectorUtil.java | 28 ++++++-
 .../nifi/kafka/connect/StatelessNiFiSinkTask.java  |  1 +
 .../kafka/connect/StatelessNiFiSourceTask.java     |  1 +
 .../PropertiesFileEngineConfigurationParser.java   | 12 +++
 .../nifi/stateless/engine/NarUnpackLock.java       | 58 +++++++++++++++
 .../engine/StatelessEngineConfiguration.java       |  2 +
 .../nifi/stateless/flow/StatelessDataflow.java     | 25 +++++++
 .../nifi/stateless/bootstrap/RunStatelessFlow.java |  2 +
 .../stateless/bootstrap/StatelessBootstrap.java    |  8 +-
 .../org/apache/nifi/extensions/DownloadQueue.java  | 86 +++++++++-------------
 .../extensions/FileSystemExtensionRepository.java  | 13 ++--
 .../stateless/engine/StandardStatelessEngine.java  |  3 +-
 .../flow/StandardStatelessDataflowFactory.java     |  2 +-
 .../nifi/stateless/flow/StandardStatelessFlow.java | 27 ++++---
 .../TestPropertiesFileFlowDefinitionParser.java    |  5 ++
 .../apache/nifi/stateless/StatelessSystemIT.java   |  5 ++
 17 files changed, 220 insertions(+), 80 deletions(-)

diff --git a/nifi-external/nifi-kafka-connect/README.md 
b/nifi-external/nifi-kafka-connect/README.md
index d25e524..fbdef23 100644
--- a/nifi-external/nifi-kafka-connect/README.md
+++ b/nifi-external/nifi-kafka-connect/README.md
@@ -117,7 +117,8 @@ as it includes annotations (1), (2), etc. for illustrative 
purposes):
 (12)   "header.attribute.regex": "syslog.*",
 (13)   "krb5.file": "/etc/krb5.conf",
 (14)   "dataflow.timeout": "30 sec",
-(15)   "parameter.Syslog Port": "19944"
+(15)   "parameter.Syslog Port": "19944",
+(16)   "extensions.directory": "/tmp/stateless-extensions"
   }
 }
 ``` 
@@ -191,6 +192,9 @@ Process Groups have their own Parameter Contexts, this 
value will be used for an
 should be applied only to a specific Parameter Context, the name of the 
Parameter Context may be supplied and separated from the Parameter Name with a 
colon. For example,
 `parameter.Syslog Context:Syslog Port`. In this case, the only Parameter 
Context whose `Syslog Port` parameter would be set would be the Parameter 
Context whose name is `Syslog Context`.
 
+`(16) extensions.directory` : Specifies the directory to add any downloaded 
extensions to. If not specified, the extensions will be written to the same 
directory that the
+connector lives in. Because this directory may not be writable, and to aid in 
upgrade scenarios, it is highly recommended that this property be configured.
+ 
 
 ### Transactional sources
 
@@ -252,7 +256,8 @@ as it includes annotations (1), (2), etc. for illustrative 
purposes):
 (12)   "headers.as.attributes.regex": "syslog.*",
 (13)   "krb5.file": "/etc/krb5.conf",
 (14)   "dataflow.timeout": "30 sec",
-(15)   "parameter.Directory": "/syslog"
+(15)   "parameter.Directory": "/syslog",
+(16)   "extensions.directory": "/tmp/stateless-extensions"
   }
 }
 ``` 
@@ -323,6 +328,9 @@ Process Groups have their own Parameter Contexts, this 
value will be used for an
 should be applied only to a specific Parameter Context, the name of the 
Parameter Context may be supplied and separated from the Parameter Name with a 
colon. For example,
 `parameter.HDFS:Directory`. In this case, the only Parameter Context whose 
`Directory` parameter would be set would be the Parameter Context whose name is 
`HDFS`.
 
+`(16) extensions.directory` : Specifies the directory to add any downloaded 
extensions to. If not specified, the extensions will be written to the same 
directory that the
+connector lives in. Because this directory may not be writable, and to aid in 
upgrade scenarios, it is highly recommended that this property be configured.
+
 
 <a name="merging"></a>
 ### Merging
@@ -441,8 +449,10 @@ The Connector will then examine its own set of downloaded 
extensions and determi
 and begin downloading them.
 
 In order to do this, the connect configuration must specify where to download 
the extensions. This is the reason for the "nexus.url" property that is 
described
-in both the Source Connector and the Sink Connector. Once downloaded, the 
extensions are placed in the same directory as existing NiFi Archive (NAR) 
files.
-Unless explicitly specified in the connector configuration (via the 
`nar.directory` configuration element), this is auto-detected to be the same 
directory
+in both the Source Connector and the Sink Connector. Once downloaded, the 
extensions are placed in the configured extensions directory (configured via the
+`extensions.directory` configuration element).
+If the `extensions.directory` is not explicitly specified in the connector 
configuration, extensions will be added to the NAR Directory 
+(configured via the `nar.directory` configuration element). If this is not 
specified, it is is auto-detected to be the same directory
 that the NiFi Kafka Connector was installed in.
 
 
@@ -467,7 +477,9 @@ Kafka Connect does not allow for state to be stored for 
Sink Tasks.
 NiFi provides several processors that are expected to run only on a single 
node in the cluster. This is accomplished by setting the Execution Node to
 "Primary Node Only" in the scheduling tab when configuring a NiFi Processor. 
When using the Source Connector, if any source processor in the configured
 dataflow is set to run on Primary Node Only, only a single task will ever run, 
even if the "tasks" configuration element is set to a large value. In this
-case, a warning will be logged if attempting to use multiple tasks for a 
dataflow that has a source processor configured for Primary Node Only.
+case, a warning will be logged if attempting to use multiple tasks for a 
dataflow that has a source processor configured for Primary Node Only. Because 
Processors
+should only be scheduled on Primary Node Only if they are sources of data, 
this is ignored for all Sink Tasks and for any Processor in a Source Task that 
has
+incoming connections.
 
 #### Processor Yielding
 
diff --git 
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
 
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
index 36e2001..ebc4c5f 100644
--- 
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
+++ 
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
@@ -55,6 +55,7 @@ public class StatelessKafkaConnectorUtil {
     private static final Lock unpackNarLock = new ReentrantLock();
 
     static final String NAR_DIRECTORY = "nar.directory";
+    static final String EXTENSIONS_DIRECTORY = "extensions.directory";
     static final String WORKING_DIRECTORY = "working.directory";
     static final String FLOW_SNAPSHOT = "flow.snapshot";
     static final String KRB5_FILE = "krb5.file";
@@ -79,6 +80,7 @@ public class StatelessKafkaConnectorUtil {
     static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
     static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
     static final File DEFAULT_WORKING_DIRECTORY = new 
File("/tmp/nifi-stateless-working");
+    static final File DEFAULT_EXTENSIONS_DIRECTORY = new 
File("/tmp/nifi-stateless-extensions");
     static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
 
     private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = 
Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
@@ -88,7 +90,9 @@ public class StatelessKafkaConnectorUtil {
     public static void addCommonConfigElements(final ConfigDef configDef) {
         configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new 
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
             "Specifies the directory that stores the NiFi Archives (NARs)");
-        configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new 
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+        configDef.define(EXTENSIONS_DIRECTORY, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.HIGH,
+            "Specifies the directory that stores the extensions that will be 
downloaded (if any) from the configured Extension Client");
+        configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.HIGH,
             "Specifies the temporary working directory for expanding NiFi 
Archives (NARs)");
         configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new 
FlowSnapshotValidator(), ConfigDef.Importance.HIGH,
             "Specifies the dataflow to run. This may be a file containing the 
dataflow, a URL that points to a dataflow, or a String containing the entire 
dataflow as an escaped JSON.");
@@ -229,12 +233,23 @@ public class StatelessKafkaConnectorUtil {
             narDirectory = new File(narDirectoryFilename);
         }
 
-        final File workingDirectory;
+        final String dataflowName = properties.get(DATAFLOW_NAME);
+
+        final File baseWorkingDirectory;
         final String workingDirectoryFilename = 
properties.get(WORKING_DIRECTORY);
         if (workingDirectoryFilename == null) {
-            workingDirectory = DEFAULT_WORKING_DIRECTORY;
+            baseWorkingDirectory = DEFAULT_WORKING_DIRECTORY;
+        } else {
+            baseWorkingDirectory = new File(workingDirectoryFilename);
+        }
+        final File workingDirectory = new File(baseWorkingDirectory, 
dataflowName);
+
+        final File extensionsDirectory;
+        final String extensionsDirectoryFilename = 
properties.get(EXTENSIONS_DIRECTORY);
+        if (extensionsDirectoryFilename == null) {
+            extensionsDirectory = DEFAULT_EXTENSIONS_DIRECTORY;
         } else {
-            workingDirectory = new File(workingDirectoryFilename);
+            extensionsDirectory = new File(extensionsDirectoryFilename);
         }
 
         final SslContextDefinition sslContextDefinition = 
createSslContextDefinition(properties);
@@ -251,6 +266,11 @@ public class StatelessKafkaConnectorUtil {
             }
 
             @Override
+            public File getExtensionsDirectory() {
+                return extensionsDirectory;
+            }
+
+            @Override
             public File getKrb5File() {
                 return new File(properties.getOrDefault(KRB5_FILE, 
DEFAULT_KRB5_FILE));
             }
diff --git 
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
 
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
index e99d526..ab153b4 100644
--- 
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
+++ 
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
@@ -78,6 +78,7 @@ public class StatelessNiFiSinkTask extends SinkTask {
         headerNamePrefix = 
properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX,
 "");
 
         dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+        dataflow.initialize();
 
         // Determine input port name. If input port is explicitly set, use the 
value given. Otherwise, if only one port exists, use that. Otherwise, throw 
ConfigException.
         final String dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
diff --git 
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
 
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
index 8a4986f..1b7cc12 100644
--- 
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
+++ 
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
@@ -88,6 +88,7 @@ public class StatelessNiFiSourceTask extends SourceTask {
         headerAttributeNamePattern = headerRegex == null ? null : 
Pattern.compile(headerRegex);
 
         dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+        dataflow.initialize();
 
         // Determine the name of the Output Port to retrieve data from
         dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
index 8459c4f..6a7d86c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
@@ -39,6 +39,7 @@ public class PropertiesFileEngineConfigurationParser {
     private static final String PREFIX = "nifi.stateless.";
 
     private static final String NAR_DIRECTORY = PREFIX + "nar.directory";
+    private static final String EXTENSIONS_DIRECTORY = PREFIX + 
"extensions.directory";
     private static final String WORKING_DIRECTORY = PREFIX + 
"working.directory";
 
     private static final String TRUSTSTORE_FILE = PREFIX + 
"security.truststore";
@@ -78,6 +79,12 @@ public class PropertiesFileEngineConfigurationParser {
             throw new StatelessConfigurationException("Working Directory " + 
workingDirectory.getAbsolutePath() + " specified in properties file does not 
exist and could not be created");
         }
 
+        final String extensionsDirectoryFilename = 
properties.getProperty(EXTENSIONS_DIRECTORY);
+        final File extensionsDirectory = extensionsDirectoryFilename == null ? 
narDirectory : new File(extensionsDirectoryFilename);
+        if (!extensionsDirectory.exists() && !extensionsDirectory.mkdirs()) {
+            throw new StatelessConfigurationException("Extensions Directory " 
+ narDirectory.getAbsolutePath() + " specified in properties file does not 
exist and could not be created");
+        }
+
         final String krb5Filename = properties.getProperty(KRB5_FILE, 
DEFAULT_KRB5_FILENAME);
         final File krb5File = new File(krb5Filename);
 
@@ -98,6 +105,11 @@ public class PropertiesFileEngineConfigurationParser {
             }
 
             @Override
+            public File getExtensionsDirectory() {
+                return extensionsDirectory;
+            }
+
+            @Override
             public File getKrb5File() {
                 return krb5File;
             }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/NarUnpackLock.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/NarUnpackLock.java
new file mode 100644
index 0000000..20c706c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/NarUnpackLock.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.engine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * If multiple Stateless dataflows are loaded concurrently within the same 
JVM, we need to ensure that the dataflows
+ * do not stomp on one another when unpacking NAR's. To do that, we need a 
mechanism by which a single lock can be shared
+ * across multiple classes, as the Extension Repository as well as the 
bootstrap logic may attempt to unpack NARs.
+ * Because these classes exist across multiple modules, and because statically 
defined locks at that level may not be enough
+ * (due to multiple classloders being used for the 'stateless nar'), we define 
a singleton Lock within the nifi-stateless-api module.
+ * This lock should always be obtained before attempting to unpack nars.
+ */
+public class NarUnpackLock {
+    private static final Logger logger = 
LoggerFactory.getLogger(NarUnpackLock.class);
+
+    private static final Lock lock = new ReentrantLock();
+
+    public static void lock() {
+        lock.lock();
+        logger.debug("Lock obtained by thread {}: {}", 
Thread.currentThread().getId(), Thread.currentThread().getName());
+    }
+
+    public static void unlock() {
+        lock.unlock();
+        logger.debug("Lock obtained by thread {}: {}", 
Thread.currentThread().getId(), Thread.currentThread().getName());
+    }
+
+    public static boolean tryLock() {
+        final boolean obtained = lock.tryLock();
+
+        if (obtained) {
+            logger.debug("Lock obtained by thread {}: {}", 
Thread.currentThread().getId(), Thread.currentThread().getName());
+        }
+
+        return obtained;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
index a21e703..79e3d6a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
@@ -28,6 +28,8 @@ public interface StatelessEngineConfiguration {
 
     File getNarDirectory();
 
+    File getExtensionsDirectory();
+
     File getKrb5File();
 
     SslContextDefinition getSslContext();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
index 89402ac..be6e62c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
@@ -24,8 +24,33 @@ import java.util.Map;
 import java.util.Set;
 
 public interface StatelessDataflow {
+    /**
+     * Triggers the dataflow to run, returning a DataflowTrigger that can be 
used to wait for the result
+     * @return a DataflowTrigger that can be used to wait for the result
+     *
+     * @throws IllegalStateException if called before {@link #initialize()} is 
called.
+     */
     DataflowTrigger trigger();
 
+    /**
+     * <p>
+     * Performs initialization necessary for triggering dataflows. These 
activities include, but are not limited to:
+     * </p>
+     *
+     * <ul>
+     *     <li>Component validation</li>
+     *     <li>Enabling Controller Services</li>
+     *     <li>Initializing processors (i.e., invoking @OnScheduled methods, 
etc.), but not triggering any Processors</li>
+     *     <li>Initializing Remote Process Groups so that they can be 
triggered</li>
+     *     <li>Scheduling Reporting Tasks to run</li>
+     * </ul>
+     *
+     * <p>
+     *     This method MUST be called prior to calling {@link #trigger()}.
+     * </p>
+     */
+    void initialize();
+
     void shutdown();
 
     StatelessDataflowValidation performValidation();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java
index 95af052..3146cbf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java
@@ -102,6 +102,8 @@ public class RunStatelessFlow {
         final DataflowDefinition<?> dataflowDefinition = 
bootstrap.parseDataflowDefinition(flowDefinitionFile);
 
         final StatelessDataflow dataflow = 
bootstrap.createDataflow(dataflowDefinition, parameterOverrides);
+        dataflow.initialize();
+
         final StatelessDataflowValidation validation = 
dataflow.performValidation();
         if (!validation.isValid()) {
             logger.error(validation.toString());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
index 415e1db..6a29fee 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
@@ -23,6 +23,7 @@ import org.apache.nifi.nar.NarUnpacker;
 import org.apache.nifi.nar.SystemBundle;
 import org.apache.nifi.stateless.config.ParameterOverride;
 import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.NarUnpackLock;
 import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
 import org.apache.nifi.stateless.flow.DataflowDefinition;
 import org.apache.nifi.stateless.flow.DataflowDefinitionParser;
@@ -98,7 +99,12 @@ public class StatelessBootstrap {
         // Unpack NARs
         final long unpackStart = System.currentTimeMillis();
         final Predicate<BundleCoordinate> narFilter = coordinate -> true;
-        NarUnpacker.unpackNars(systemBundle, frameworkWorkingDir, 
extensionsWorkingDir, null, narDirectories, false, false, false, narFilter);
+        NarUnpackLock.lock();
+        try {
+            NarUnpacker.unpackNars(systemBundle, frameworkWorkingDir, 
extensionsWorkingDir, null, narDirectories, false, false, false, narFilter);
+        } finally {
+            NarUnpackLock.unlock();
+        }
         final long unpackMillis = System.currentTimeMillis() - unpackStart;
         logger.info("Unpacked NAR files in {} millis", unpackMillis);
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java
index c8153f0..18fccf1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java
@@ -64,7 +64,7 @@ public class DownloadQueue {
     private final List<ExtensionClient> clients;
 
     private final BlockingQueue<BundleCoordinate> toDownload = new 
LinkedBlockingQueue<>();
-    private final Set<BundleCoordinate> allDownloads = new HashSet<>();
+    private final Set<BundleCoordinate> allDownloads = 
Collections.synchronizedSet(new HashSet<>());
 
     public DownloadQueue(final ExtensionManager extensionManager, final 
ExecutorService executorService, final int concurrentDownloads, final 
Collection<BundleCoordinate> bundles,
                          final File narLibDirectory, final 
List<ExtensionClient> clients) {
@@ -74,18 +74,23 @@ public class DownloadQueue {
         this.narLibDirectory = narLibDirectory;
         this.clients = clients;
 
+        if (!narLibDirectory.exists()) {
+            final boolean created = narLibDirectory.mkdirs() || 
narLibDirectory.exists();
+            if (!created) {
+                logger.error("Extensions directory {} did not exist and could 
not be created.", narLibDirectory.getAbsolutePath());
+            }
+        }
+
         toDownload.addAll(bundles);
         allDownloads.addAll(bundles);
     }
 
     @SuppressWarnings("rawtypes")
     public CompletableFuture<Void> download() {
-        final Set<File> downloaded = Collections.synchronizedSet(new 
HashSet<>());
-
         final CompletableFuture[] futures = new 
CompletableFuture[concurrentDownloads];
         for (int i=0; i < concurrentDownloads; i++) {
             final CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-            executorService.submit(new DownloadTask(toDownload, 
completableFuture, downloaded));
+            executorService.submit(new DownloadTask(toDownload, 
completableFuture, allDownloads));
             futures[i] = completableFuture;
         }
 
@@ -110,29 +115,6 @@ public class DownloadQueue {
             && !NarClassLoaders.FRAMEWORK_NAR_ID.equals(coordinate.getId());
     }
 
-    private synchronized void queueParents(final BundleCoordinate 
parentCoordinates) {
-        if (parentCoordinates == null) {
-            return;
-        }
-
-        final Bundle existingBundle = 
extensionManager.getBundle(parentCoordinates);
-        if (existingBundle == null) {
-            if (allDownloads.contains(parentCoordinates)) {
-                // Already queued for download.
-                return;
-            }
-
-            // We don't have have the parent yet. Queue it for download.
-            logger.debug("Enqueuing parent bundle {} to be downloaded", 
parentCoordinates);
-            allDownloads.add(parentCoordinates);
-            toDownload.add(parentCoordinates);
-            return;
-        }
-
-        // Check/queue anything needed for download, recursively.
-        
queueParents(existingBundle.getBundleDetails().getDependencyCoordinate());
-    }
-
     private File getBundleFile(final BundleCoordinate coordinate) {
         final String filename = coordinate.getId() + "-" + 
coordinate.getVersion() + ".nar";
         return new File(narLibDirectory, filename);
@@ -142,12 +124,12 @@ public class DownloadQueue {
     private class DownloadTask implements Runnable {
         private final BlockingQueue<BundleCoordinate> downloadQueue;
         private final CompletableFuture<Void> completableFuture;
-        private final Set<File> filesDownloaded;
+        private final Set<BundleCoordinate> downloads;
 
-        public DownloadTask(final BlockingQueue<BundleCoordinate> 
downloadQueue, final CompletableFuture<Void> completableFuture, final Set<File> 
filesDownloaded) {
+        public DownloadTask(final BlockingQueue<BundleCoordinate> 
downloadQueue, final CompletableFuture<Void> completableFuture, final 
Set<BundleCoordinate> filesDownloaded) {
             this.downloadQueue = downloadQueue;
             this.completableFuture = completableFuture;
-            this.filesDownloaded = filesDownloaded;
+            this.downloads = filesDownloaded;
         }
 
         @Override
@@ -155,19 +137,7 @@ public class DownloadQueue {
             BundleCoordinate coordinate;
             while ((coordinate = downloadQueue.poll()) != null) {
                 try {
-                    final File downloaded = download(coordinate);
-                    if (downloaded != null) {
-                        filesDownloaded.add(downloaded);
-
-                        final BundleCoordinate parentCoordinate = 
getParentCoordinate(downloaded);
-                        queueParents(parentCoordinate);
-                    }
-
-                    final Bundle existingBundle = 
extensionManager.getBundle(coordinate);
-                    if (existingBundle != null) {
-                        final BundleCoordinate parentCoordinate = 
existingBundle.getBundleDetails().getDependencyCoordinate();
-                        queueParents(parentCoordinate);
-                    }
+                    downloadBundleAndParents(coordinate);
                 } catch (final Exception e) {
                     logger.error("Failed to download {}", coordinate, e);
                     completableFuture.completeExceptionally(e);
@@ -177,6 +147,26 @@ public class DownloadQueue {
             completableFuture.complete(null);
         }
 
+        private void downloadBundleAndParents(final BundleCoordinate 
coordinate) throws IOException {
+            if (coordinate == null) {
+                return;
+            }
+
+            downloads.add(coordinate);
+
+            final File downloaded = download(coordinate);
+            if (downloaded != null) {
+                final BundleCoordinate parentCoordinate = 
getParentCoordinate(downloaded);
+                downloadBundleAndParents(parentCoordinate);
+            }
+
+            final Bundle existingBundle = 
extensionManager.getBundle(coordinate);
+            if (existingBundle != null) {
+                final BundleCoordinate parentCoordinate = 
existingBundle.getBundleDetails().getDependencyCoordinate();
+                downloadBundleAndParents(parentCoordinate);
+            }
+        }
+
         private BundleCoordinate getParentCoordinate(final File narFile) 
throws IOException {
             try (final JarFile nar = new JarFile(narFile)) {
                 final Manifest manifest = nar.getManifest();
@@ -198,18 +188,14 @@ public class DownloadQueue {
             final List<Exception> suppressed = new ArrayList<>();
             final File destinationFile = getBundleFile(coordinate);
 
-            if (NarClassLoaders.JETTY_NAR_ID.equals(coordinate.getId())) {
-                logger.debug("Requested to download {} but only a single Jetty 
NAR is allowed to exist so will not download.", coordinate);
-                return null;
-            }
-            if (NarClassLoaders.FRAMEWORK_NAR_ID.equals(coordinate.getId())) {
-                logger.debug("Requested to download {} but only a single NiFi 
Framework NAR is allowed to exist so will not download.", coordinate);
+            if (!isDownloadable(coordinate)) {
+                logger.debug("Requested to download {} but only a single NAR 
of this type is allowed to exist so will not download.", coordinate);
                 return null;
             }
 
             if (destinationFile.exists()) {
                 logger.debug("Requested to download {} but destination file {} 
already exists. Will not download.", coordinate, destinationFile);
-                return null;
+                return destinationFile;
             }
 
             for (final ExtensionClient extensionClient : clients) {
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java
index 1f9ddbf..173d7d9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java
@@ -24,6 +24,7 @@ import org.apache.nifi.nar.ExtensionDiscoveringManager;
 import org.apache.nifi.nar.NarClassLoaders;
 import org.apache.nifi.nar.NarLoadResult;
 import org.apache.nifi.nar.NarUnpacker;
+import org.apache.nifi.stateless.engine.NarUnpackLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,13 +37,10 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 
 public class FileSystemExtensionRepository implements ExtensionRepository {
     private static final Logger logger = 
LoggerFactory.getLogger(FileSystemExtensionRepository.class);
-    private static final Lock unpackLock = new ReentrantLock();
 
     private final ExtensionDiscoveringManager extensionManager;
     private final NarClassLoaders narClassLoaders;
@@ -91,6 +89,7 @@ public class FileSystemExtensionRepository implements 
ExtensionRepository {
 
         final DownloadQueue downloadQueue = new 
DownloadQueue(extensionManager, executorService, concurrentDownloads, 
bundleCoordinates, narLibDirectory, clients);
         final CompletableFuture<Void> downloadFuture = 
downloadQueue.download();
+        logger.info("Beginning download of extensions {}", bundleCoordinates);
 
         final CompletableFuture<Set<Bundle>> loadFuture = 
downloadFuture.thenApply(new Function<Void, Set<Bundle>>() {
             @Override
@@ -116,12 +115,14 @@ public class FileSystemExtensionRepository implements 
ExtensionRepository {
         for (final File downloadedFile : downloadedFiles) {
             // Use a statically defined Lock to prevent multiple threads from 
unpacking their downloaded nars at the same time,
             // even if they use a different ExtensionRepository.
-            unpackLock.lock();
+            NarUnpackLock.lock();
             try {
-                final File unpackedDir = NarUnpacker.unpackNar(downloadedFile, 
workingDirectory, false);
+                logger.info("Unpacking {}", downloadedFile);
+                final File extensionsWorkingDirectory = new 
File(workingDirectory, "extensions");
+                final File unpackedDir = NarUnpacker.unpackNar(downloadedFile, 
extensionsWorkingDirectory, false);
                 unpackedDirs.add(unpackedDir);
             } finally {
-                unpackLock.unlock();
+                NarUnpackLock.unlock();
             }
         }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 19fc7b9..74673ea 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -169,8 +169,7 @@ public class StandardStatelessEngine implements 
StatelessEngine<VersionedFlowSna
 
         final List<ReportingTaskNode> reportingTaskNodes = 
createReportingTasks(dataflowDefinition);
         final StandardStatelessFlow dataflow = new 
StandardStatelessFlow(childGroup, reportingTaskNodes, 
controllerServiceProvider, processContextFactory,
-            repositoryContextFactory, dataflowDefinition, 
stateManagerProvider);
-        dataflow.initialize(processScheduler);
+            repositoryContextFactory, dataflowDefinition, 
stateManagerProvider, processScheduler);
         return dataflow;
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index 5832896..d462429 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -144,7 +144,7 @@ public class StandardStatelessDataflowFactory implements 
StatelessDataflowFactor
                 extensionClients.add(extensionClient);
             }
 
-            final ExtensionRepository extensionRepository = new 
FileSystemExtensionRepository(extensionManager, 
engineConfiguration.getNarDirectory(), 
engineConfiguration.getWorkingDirectory(),
+            final ExtensionRepository extensionRepository = new 
FileSystemExtensionRepository(extensionManager, 
engineConfiguration.getExtensionsDirectory(), 
engineConfiguration.getWorkingDirectory(),
                 narClassLoaders, extensionClients);
 
             final VariableRegistry variableRegistry = 
VariableRegistry.EMPTY_REGISTRY;
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 672a79b..04ea9e6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -94,14 +94,14 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
     private final DataflowDefinition<?> dataflowDefinition;
     private final StatelessStateManagerProvider stateManagerProvider;
     private final ObjectMapper objectMapper = new ObjectMapper();
+    private final ProcessScheduler processScheduler;
 
     private volatile ExecutorService runDataflowExecutor;
-    private volatile ProcessScheduler processScheduler;
     private volatile boolean initialized = false;
 
     public StandardStatelessFlow(final ProcessGroup rootGroup, final 
List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider 
controllerServiceProvider,
                                  final ProcessContextFactory 
processContextFactory, final RepositoryContextFactory repositoryContextFactory, 
final DataflowDefinition<?> dataflowDefinition,
-                                 final StatelessStateManagerProvider 
stateManagerProvider) {
+                                 final StatelessStateManagerProvider 
stateManagerProvider, final ProcessScheduler processScheduler) {
         this.rootGroup = rootGroup;
         this.allConnections = rootGroup.findAllConnections();
         this.reportingTasks = reportingTasks;
@@ -110,6 +110,7 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         this.repositoryContextFactory = repositoryContextFactory;
         this.dataflowDefinition = dataflowDefinition;
         this.stateManagerProvider = stateManagerProvider;
+        this.processScheduler = processScheduler;
 
         rootConnectables = new HashSet<>();
 
@@ -164,19 +165,24 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         }
     }
 
-    public void initialize(final ProcessScheduler processScheduler) {
+    @Override
+    public void initialize() {
         if (initialized) {
-            throw new IllegalStateException("Cannot initialize dataflow more 
than once");
+            logger.debug("{} initialize() was called, but dataflow has already 
been initialized. Returning without doing anything.", this);
+            return;
         }
 
         initialized = true;
-        this.processScheduler = processScheduler;
 
         // Trigger validation to occur so that components can be 
enabled/started.
         final long validationStart = System.currentTimeMillis();
-        performValidation();
+        final StatelessDataflowValidation validationResult = 
performValidation();
         final long validationMillis = System.currentTimeMillis() - 
validationStart;
 
+        if (!validationResult.isValid()) {
+            logger.warn("{} Attempting to initialize dataflow but found at 
least one invalid component: {}", this, validationResult);
+        }
+
         // Enable Controller Services and start processors in the flow.
         // This is different than the calling ProcessGroup.startProcessing() 
because
         // that method triggers the behavior to happen in the background and 
provides no way of knowing
@@ -256,7 +262,9 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
 
     @Override
     public void shutdown() {
-        runDataflowExecutor.shutdown();
+        if (runDataflowExecutor != null) {
+            runDataflowExecutor.shutdown();
+        }
 
         rootGroup.stopProcessing();
         
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
@@ -274,10 +282,7 @@ public class StandardStatelessFlow implements 
StatelessDataflow {
         // invoke any methods annotated with @OnShutdown on Reporting Tasks
         reportingTasks.forEach(processScheduler::shutdownReportingTask);
 
-        if (processScheduler != null) {
-            processScheduler.shutdown();
-        }
-
+        processScheduler.shutdown();
         repositoryContextFactory.shutdown();
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
index 19c629e..772c874 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
@@ -75,6 +75,11 @@ public class TestPropertiesFileFlowDefinitionParser {
             }
 
             @Override
+            public File getExtensionsDirectory() {
+                return null;
+            }
+
+            @Override
             public File getKrb5File() {
                 return null;
             }
diff --git 
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
 
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
index 4fe6c76..268061b 100644
--- 
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
+++ 
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
@@ -82,6 +82,11 @@ public class StatelessSystemIT {
             }
 
             @Override
+            public File getExtensionsDirectory() {
+                return new File("target/nifi-stateless-assembly/extensions");
+            }
+
+            @Override
             public File getKrb5File() {
                 return new File("/etc/krb5.conf");
             }

Reply via email to