This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b799879 NIFI-8380: Allow for an extensions.directory property to
specify where to place downloaded files. Also fixed an issue that was
encountered, when a Source Processor is scheduled for Primary Node Only but
more than 1 task is set. In that case, even though only a single task will
should be scheduled, an Exception was getting thrown because @OnScheduled
methods of Processors were still called. To avoid this, moved the
initialization of the dataflow outside of the creation of [...]
b799879 is described below
commit b79987918a3f8e3cb69e3d21176f09704c441707
Author: Mark Payne <[email protected]>
AuthorDate: Mon Mar 29 12:23:08 2021 -0400
NIFI-8380: Allow for an extensions.directory property to specify where to
place downloaded files. Also fixed an issue that was encountered, when a Source
Processor is scheduled for Primary Node Only but more than 1 task is set. In
that case, even though only a single task will should be scheduled, an
Exception was getting thrown because @OnScheduled methods of Processors were
still called. To avoid this, moved the initialization of the dataflow outside
of the creation of the dataflow [...]
NIFI-8380: Removed requirement in validation for working directory and
extensions directory to exist; removed auto-creation of directories in
validation
NIFI-8380: Fixed a few thrading bugs, so that if we have multiple threads
trying to download/unpack extensions, we properly synchronize the unpacking and
unpack into the correct sub-directory under the working directory
Signed-off-by: Pierre Villard <[email protected]>
This closes #4950.
---
nifi-external/nifi-kafka-connect/README.md | 22 ++++--
.../kafka/connect/StatelessKafkaConnectorUtil.java | 28 ++++++-
.../nifi/kafka/connect/StatelessNiFiSinkTask.java | 1 +
.../kafka/connect/StatelessNiFiSourceTask.java | 1 +
.../PropertiesFileEngineConfigurationParser.java | 12 +++
.../nifi/stateless/engine/NarUnpackLock.java | 58 +++++++++++++++
.../engine/StatelessEngineConfiguration.java | 2 +
.../nifi/stateless/flow/StatelessDataflow.java | 25 +++++++
.../nifi/stateless/bootstrap/RunStatelessFlow.java | 2 +
.../stateless/bootstrap/StatelessBootstrap.java | 8 +-
.../org/apache/nifi/extensions/DownloadQueue.java | 86 +++++++++-------------
.../extensions/FileSystemExtensionRepository.java | 13 ++--
.../stateless/engine/StandardStatelessEngine.java | 3 +-
.../flow/StandardStatelessDataflowFactory.java | 2 +-
.../nifi/stateless/flow/StandardStatelessFlow.java | 27 ++++---
.../TestPropertiesFileFlowDefinitionParser.java | 5 ++
.../apache/nifi/stateless/StatelessSystemIT.java | 5 ++
17 files changed, 220 insertions(+), 80 deletions(-)
diff --git a/nifi-external/nifi-kafka-connect/README.md
b/nifi-external/nifi-kafka-connect/README.md
index d25e524..fbdef23 100644
--- a/nifi-external/nifi-kafka-connect/README.md
+++ b/nifi-external/nifi-kafka-connect/README.md
@@ -117,7 +117,8 @@ as it includes annotations (1), (2), etc. for illustrative
purposes):
(12) "header.attribute.regex": "syslog.*",
(13) "krb5.file": "/etc/krb5.conf",
(14) "dataflow.timeout": "30 sec",
-(15) "parameter.Syslog Port": "19944"
+(15) "parameter.Syslog Port": "19944",
+(16) "extensions.directory": "/tmp/stateless-extensions"
}
}
```
@@ -191,6 +192,9 @@ Process Groups have their own Parameter Contexts, this
value will be used for an
should be applied only to a specific Parameter Context, the name of the
Parameter Context may be supplied and separated from the Parameter Name with a
colon. For example,
`parameter.Syslog Context:Syslog Port`. In this case, the only Parameter
Context whose `Syslog Port` parameter would be set would be the Parameter
Context whose name is `Syslog Context`.
+`(16) extensions.directory` : Specifies the directory to add any downloaded
extensions to. If not specified, the extensions will be written to the same
directory that the
+connector lives in. Because this directory may not be writable, and to aid in
upgrade scenarios, it is highly recommended that this property be configured.
+
### Transactional sources
@@ -252,7 +256,8 @@ as it includes annotations (1), (2), etc. for illustrative
purposes):
(12) "headers.as.attributes.regex": "syslog.*",
(13) "krb5.file": "/etc/krb5.conf",
(14) "dataflow.timeout": "30 sec",
-(15) "parameter.Directory": "/syslog"
+(15) "parameter.Directory": "/syslog",
+(16) "extensions.directory": "/tmp/stateless-extensions"
}
}
```
@@ -323,6 +328,9 @@ Process Groups have their own Parameter Contexts, this
value will be used for an
should be applied only to a specific Parameter Context, the name of the
Parameter Context may be supplied and separated from the Parameter Name with a
colon. For example,
`parameter.HDFS:Directory`. In this case, the only Parameter Context whose
`Directory` parameter would be set would be the Parameter Context whose name is
`HDFS`.
+`(16) extensions.directory` : Specifies the directory to add any downloaded
extensions to. If not specified, the extensions will be written to the same
directory that the
+connector lives in. Because this directory may not be writable, and to aid in
upgrade scenarios, it is highly recommended that this property be configured.
+
<a name="merging"></a>
### Merging
@@ -441,8 +449,10 @@ The Connector will then examine its own set of downloaded
extensions and determi
and begin downloading them.
In order to do this, the connect configuration must specify where to download
the extensions. This is the reason for the "nexus.url" property that is
described
-in both the Source Connector and the Sink Connector. Once downloaded, the
extensions are placed in the same directory as existing NiFi Archive (NAR)
files.
-Unless explicitly specified in the connector configuration (via the
`nar.directory` configuration element), this is auto-detected to be the same
directory
+in both the Source Connector and the Sink Connector. Once downloaded, the
extensions are placed in the configured extensions directory (configured via the
+`extensions.directory` configuration element).
+If the `extensions.directory` is not explicitly specified in the connector
configuration, extensions will be added to the NAR Directory
+(configured via the `nar.directory` configuration element). If this is not
specified, it is is auto-detected to be the same directory
that the NiFi Kafka Connector was installed in.
@@ -467,7 +477,9 @@ Kafka Connect does not allow for state to be stored for
Sink Tasks.
NiFi provides several processors that are expected to run only on a single
node in the cluster. This is accomplished by setting the Execution Node to
"Primary Node Only" in the scheduling tab when configuring a NiFi Processor.
When using the Source Connector, if any source processor in the configured
dataflow is set to run on Primary Node Only, only a single task will ever run,
even if the "tasks" configuration element is set to a large value. In this
-case, a warning will be logged if attempting to use multiple tasks for a
dataflow that has a source processor configured for Primary Node Only.
+case, a warning will be logged if attempting to use multiple tasks for a
dataflow that has a source processor configured for Primary Node Only. Because
Processors
+should only be scheduled on Primary Node Only if they are sources of data,
this is ignored for all Sink Tasks and for any Processor in a Source Task that
has
+incoming connections.
#### Processor Yielding
diff --git
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
index 36e2001..ebc4c5f 100644
---
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
+++
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
@@ -55,6 +55,7 @@ public class StatelessKafkaConnectorUtil {
private static final Lock unpackNarLock = new ReentrantLock();
static final String NAR_DIRECTORY = "nar.directory";
+ static final String EXTENSIONS_DIRECTORY = "extensions.directory";
static final String WORKING_DIRECTORY = "working.directory";
static final String FLOW_SNAPSHOT = "flow.snapshot";
static final String KRB5_FILE = "krb5.file";
@@ -79,6 +80,7 @@ public class StatelessKafkaConnectorUtil {
static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
static final File DEFAULT_WORKING_DIRECTORY = new
File("/tmp/nifi-stateless-working");
+ static final File DEFAULT_EXTENSIONS_DIRECTORY = new
File("/tmp/nifi-stateless-extensions");
static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN =
Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
@@ -88,7 +90,9 @@ public class StatelessKafkaConnectorUtil {
public static void addCommonConfigElements(final ConfigDef configDef) {
configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
"Specifies the directory that stores the NiFi Archives (NARs)");
- configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+ configDef.define(EXTENSIONS_DIRECTORY, ConfigDef.Type.STRING, null,
ConfigDef.Importance.HIGH,
+ "Specifies the directory that stores the extensions that will be
downloaded (if any) from the configured Extension Client");
+ configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null,
ConfigDef.Importance.HIGH,
"Specifies the temporary working directory for expanding NiFi
Archives (NARs)");
configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new
FlowSnapshotValidator(), ConfigDef.Importance.HIGH,
"Specifies the dataflow to run. This may be a file containing the
dataflow, a URL that points to a dataflow, or a String containing the entire
dataflow as an escaped JSON.");
@@ -229,12 +233,23 @@ public class StatelessKafkaConnectorUtil {
narDirectory = new File(narDirectoryFilename);
}
- final File workingDirectory;
+ final String dataflowName = properties.get(DATAFLOW_NAME);
+
+ final File baseWorkingDirectory;
final String workingDirectoryFilename =
properties.get(WORKING_DIRECTORY);
if (workingDirectoryFilename == null) {
- workingDirectory = DEFAULT_WORKING_DIRECTORY;
+ baseWorkingDirectory = DEFAULT_WORKING_DIRECTORY;
+ } else {
+ baseWorkingDirectory = new File(workingDirectoryFilename);
+ }
+ final File workingDirectory = new File(baseWorkingDirectory,
dataflowName);
+
+ final File extensionsDirectory;
+ final String extensionsDirectoryFilename =
properties.get(EXTENSIONS_DIRECTORY);
+ if (extensionsDirectoryFilename == null) {
+ extensionsDirectory = DEFAULT_EXTENSIONS_DIRECTORY;
} else {
- workingDirectory = new File(workingDirectoryFilename);
+ extensionsDirectory = new File(extensionsDirectoryFilename);
}
final SslContextDefinition sslContextDefinition =
createSslContextDefinition(properties);
@@ -251,6 +266,11 @@ public class StatelessKafkaConnectorUtil {
}
@Override
+ public File getExtensionsDirectory() {
+ return extensionsDirectory;
+ }
+
+ @Override
public File getKrb5File() {
return new File(properties.getOrDefault(KRB5_FILE,
DEFAULT_KRB5_FILE));
}
diff --git
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
index e99d526..ab153b4 100644
---
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
+++
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
@@ -78,6 +78,7 @@ public class StatelessNiFiSinkTask extends SinkTask {
headerNamePrefix =
properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX,
"");
dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+ dataflow.initialize();
// Determine input port name. If input port is explicitly set, use the
value given. Otherwise, if only one port exists, use that. Otherwise, throw
ConfigException.
final String dataflowName =
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
diff --git
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
index 8a4986f..1b7cc12 100644
---
a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
+++
b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
@@ -88,6 +88,7 @@ public class StatelessNiFiSourceTask extends SourceTask {
headerAttributeNamePattern = headerRegex == null ? null :
Pattern.compile(headerRegex);
dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+ dataflow.initialize();
// Determine the name of the Output Port to retrieve data from
dataflowName =
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
index 8459c4f..6a7d86c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java
@@ -39,6 +39,7 @@ public class PropertiesFileEngineConfigurationParser {
private static final String PREFIX = "nifi.stateless.";
private static final String NAR_DIRECTORY = PREFIX + "nar.directory";
+ private static final String EXTENSIONS_DIRECTORY = PREFIX +
"extensions.directory";
private static final String WORKING_DIRECTORY = PREFIX +
"working.directory";
private static final String TRUSTSTORE_FILE = PREFIX +
"security.truststore";
@@ -78,6 +79,12 @@ public class PropertiesFileEngineConfigurationParser {
throw new StatelessConfigurationException("Working Directory " +
workingDirectory.getAbsolutePath() + " specified in properties file does not
exist and could not be created");
}
+ final String extensionsDirectoryFilename =
properties.getProperty(EXTENSIONS_DIRECTORY);
+ final File extensionsDirectory = extensionsDirectoryFilename == null ?
narDirectory : new File(extensionsDirectoryFilename);
+ if (!extensionsDirectory.exists() && !extensionsDirectory.mkdirs()) {
+ throw new StatelessConfigurationException("Extensions Directory "
+ narDirectory.getAbsolutePath() + " specified in properties file does not
exist and could not be created");
+ }
+
final String krb5Filename = properties.getProperty(KRB5_FILE,
DEFAULT_KRB5_FILENAME);
final File krb5File = new File(krb5Filename);
@@ -98,6 +105,11 @@ public class PropertiesFileEngineConfigurationParser {
}
@Override
+ public File getExtensionsDirectory() {
+ return extensionsDirectory;
+ }
+
+ @Override
public File getKrb5File() {
return krb5File;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/NarUnpackLock.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/NarUnpackLock.java
new file mode 100644
index 0000000..20c706c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/NarUnpackLock.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.engine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * If multiple Stateless dataflows are loaded concurrently within the same
JVM, we need to ensure that the dataflows
+ * do not stomp on one another when unpacking NAR's. To do that, we need a
mechanism by which a single lock can be shared
+ * across multiple classes, as the Extension Repository as well as the
bootstrap logic may attempt to unpack NARs.
+ * Because these classes exist across multiple modules, and because statically
defined locks at that level may not be enough
+ * (due to multiple classloders being used for the 'stateless nar'), we define
a singleton Lock within the nifi-stateless-api module.
+ * This lock should always be obtained before attempting to unpack nars.
+ */
+public class NarUnpackLock {
+ private static final Logger logger =
LoggerFactory.getLogger(NarUnpackLock.class);
+
+ private static final Lock lock = new ReentrantLock();
+
+ public static void lock() {
+ lock.lock();
+ logger.debug("Lock obtained by thread {}: {}",
Thread.currentThread().getId(), Thread.currentThread().getName());
+ }
+
+ public static void unlock() {
+ lock.unlock();
+ logger.debug("Lock obtained by thread {}: {}",
Thread.currentThread().getId(), Thread.currentThread().getName());
+ }
+
+ public static boolean tryLock() {
+ final boolean obtained = lock.tryLock();
+
+ if (obtained) {
+ logger.debug("Lock obtained by thread {}: {}",
Thread.currentThread().getId(), Thread.currentThread().getName());
+ }
+
+ return obtained;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
index a21e703..79e3d6a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java
@@ -28,6 +28,8 @@ public interface StatelessEngineConfiguration {
File getNarDirectory();
+ File getExtensionsDirectory();
+
File getKrb5File();
SslContextDefinition getSslContext();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
index 89402ac..be6e62c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java
@@ -24,8 +24,33 @@ import java.util.Map;
import java.util.Set;
public interface StatelessDataflow {
+ /**
+ * Triggers the dataflow to run, returning a DataflowTrigger that can be
used to wait for the result
+ * @return a DataflowTrigger that can be used to wait for the result
+ *
+ * @throws IllegalStateException if called before {@link #initialize()} is
called.
+ */
DataflowTrigger trigger();
+ /**
+ * <p>
+ * Performs initialization necessary for triggering dataflows. These
activities include, but are not limited to:
+ * </p>
+ *
+ * <ul>
+ * <li>Component validation</li>
+ * <li>Enabling Controller Services</li>
+ * <li>Initializing processors (i.e., invoking @OnScheduled methods,
etc.), but not triggering any Processors</li>
+ * <li>Initializing Remote Process Groups so that they can be
triggered</li>
+ * <li>Scheduling Reporting Tasks to run</li>
+ * </ul>
+ *
+ * <p>
+ * This method MUST be called prior to calling {@link #trigger()}.
+ * </p>
+ */
+ void initialize();
+
void shutdown();
StatelessDataflowValidation performValidation();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java
index 95af052..3146cbf 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java
@@ -102,6 +102,8 @@ public class RunStatelessFlow {
final DataflowDefinition<?> dataflowDefinition =
bootstrap.parseDataflowDefinition(flowDefinitionFile);
final StatelessDataflow dataflow =
bootstrap.createDataflow(dataflowDefinition, parameterOverrides);
+ dataflow.initialize();
+
final StatelessDataflowValidation validation =
dataflow.performValidation();
if (!validation.isValid()) {
logger.error(validation.toString());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
index 415e1db..6a29fee 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/StatelessBootstrap.java
@@ -23,6 +23,7 @@ import org.apache.nifi.nar.NarUnpacker;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.stateless.config.ParameterOverride;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.NarUnpackLock;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.DataflowDefinitionParser;
@@ -98,7 +99,12 @@ public class StatelessBootstrap {
// Unpack NARs
final long unpackStart = System.currentTimeMillis();
final Predicate<BundleCoordinate> narFilter = coordinate -> true;
- NarUnpacker.unpackNars(systemBundle, frameworkWorkingDir,
extensionsWorkingDir, null, narDirectories, false, false, false, narFilter);
+ NarUnpackLock.lock();
+ try {
+ NarUnpacker.unpackNars(systemBundle, frameworkWorkingDir,
extensionsWorkingDir, null, narDirectories, false, false, false, narFilter);
+ } finally {
+ NarUnpackLock.unlock();
+ }
final long unpackMillis = System.currentTimeMillis() - unpackStart;
logger.info("Unpacked NAR files in {} millis", unpackMillis);
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java
index c8153f0..18fccf1 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/DownloadQueue.java
@@ -64,7 +64,7 @@ public class DownloadQueue {
private final List<ExtensionClient> clients;
private final BlockingQueue<BundleCoordinate> toDownload = new
LinkedBlockingQueue<>();
- private final Set<BundleCoordinate> allDownloads = new HashSet<>();
+ private final Set<BundleCoordinate> allDownloads =
Collections.synchronizedSet(new HashSet<>());
public DownloadQueue(final ExtensionManager extensionManager, final
ExecutorService executorService, final int concurrentDownloads, final
Collection<BundleCoordinate> bundles,
final File narLibDirectory, final
List<ExtensionClient> clients) {
@@ -74,18 +74,23 @@ public class DownloadQueue {
this.narLibDirectory = narLibDirectory;
this.clients = clients;
+ if (!narLibDirectory.exists()) {
+ final boolean created = narLibDirectory.mkdirs() ||
narLibDirectory.exists();
+ if (!created) {
+ logger.error("Extensions directory {} did not exist and could
not be created.", narLibDirectory.getAbsolutePath());
+ }
+ }
+
toDownload.addAll(bundles);
allDownloads.addAll(bundles);
}
@SuppressWarnings("rawtypes")
public CompletableFuture<Void> download() {
- final Set<File> downloaded = Collections.synchronizedSet(new
HashSet<>());
-
final CompletableFuture[] futures = new
CompletableFuture[concurrentDownloads];
for (int i=0; i < concurrentDownloads; i++) {
final CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- executorService.submit(new DownloadTask(toDownload,
completableFuture, downloaded));
+ executorService.submit(new DownloadTask(toDownload,
completableFuture, allDownloads));
futures[i] = completableFuture;
}
@@ -110,29 +115,6 @@ public class DownloadQueue {
&& !NarClassLoaders.FRAMEWORK_NAR_ID.equals(coordinate.getId());
}
- private synchronized void queueParents(final BundleCoordinate
parentCoordinates) {
- if (parentCoordinates == null) {
- return;
- }
-
- final Bundle existingBundle =
extensionManager.getBundle(parentCoordinates);
- if (existingBundle == null) {
- if (allDownloads.contains(parentCoordinates)) {
- // Already queued for download.
- return;
- }
-
- // We don't have have the parent yet. Queue it for download.
- logger.debug("Enqueuing parent bundle {} to be downloaded",
parentCoordinates);
- allDownloads.add(parentCoordinates);
- toDownload.add(parentCoordinates);
- return;
- }
-
- // Check/queue anything needed for download, recursively.
-
queueParents(existingBundle.getBundleDetails().getDependencyCoordinate());
- }
-
private File getBundleFile(final BundleCoordinate coordinate) {
final String filename = coordinate.getId() + "-" +
coordinate.getVersion() + ".nar";
return new File(narLibDirectory, filename);
@@ -142,12 +124,12 @@ public class DownloadQueue {
private class DownloadTask implements Runnable {
private final BlockingQueue<BundleCoordinate> downloadQueue;
private final CompletableFuture<Void> completableFuture;
- private final Set<File> filesDownloaded;
+ private final Set<BundleCoordinate> downloads;
- public DownloadTask(final BlockingQueue<BundleCoordinate>
downloadQueue, final CompletableFuture<Void> completableFuture, final Set<File>
filesDownloaded) {
+ public DownloadTask(final BlockingQueue<BundleCoordinate>
downloadQueue, final CompletableFuture<Void> completableFuture, final
Set<BundleCoordinate> filesDownloaded) {
this.downloadQueue = downloadQueue;
this.completableFuture = completableFuture;
- this.filesDownloaded = filesDownloaded;
+ this.downloads = filesDownloaded;
}
@Override
@@ -155,19 +137,7 @@ public class DownloadQueue {
BundleCoordinate coordinate;
while ((coordinate = downloadQueue.poll()) != null) {
try {
- final File downloaded = download(coordinate);
- if (downloaded != null) {
- filesDownloaded.add(downloaded);
-
- final BundleCoordinate parentCoordinate =
getParentCoordinate(downloaded);
- queueParents(parentCoordinate);
- }
-
- final Bundle existingBundle =
extensionManager.getBundle(coordinate);
- if (existingBundle != null) {
- final BundleCoordinate parentCoordinate =
existingBundle.getBundleDetails().getDependencyCoordinate();
- queueParents(parentCoordinate);
- }
+ downloadBundleAndParents(coordinate);
} catch (final Exception e) {
logger.error("Failed to download {}", coordinate, e);
completableFuture.completeExceptionally(e);
@@ -177,6 +147,26 @@ public class DownloadQueue {
completableFuture.complete(null);
}
+ private void downloadBundleAndParents(final BundleCoordinate
coordinate) throws IOException {
+ if (coordinate == null) {
+ return;
+ }
+
+ downloads.add(coordinate);
+
+ final File downloaded = download(coordinate);
+ if (downloaded != null) {
+ final BundleCoordinate parentCoordinate =
getParentCoordinate(downloaded);
+ downloadBundleAndParents(parentCoordinate);
+ }
+
+ final Bundle existingBundle =
extensionManager.getBundle(coordinate);
+ if (existingBundle != null) {
+ final BundleCoordinate parentCoordinate =
existingBundle.getBundleDetails().getDependencyCoordinate();
+ downloadBundleAndParents(parentCoordinate);
+ }
+ }
+
private BundleCoordinate getParentCoordinate(final File narFile)
throws IOException {
try (final JarFile nar = new JarFile(narFile)) {
final Manifest manifest = nar.getManifest();
@@ -198,18 +188,14 @@ public class DownloadQueue {
final List<Exception> suppressed = new ArrayList<>();
final File destinationFile = getBundleFile(coordinate);
- if (NarClassLoaders.JETTY_NAR_ID.equals(coordinate.getId())) {
- logger.debug("Requested to download {} but only a single Jetty
NAR is allowed to exist so will not download.", coordinate);
- return null;
- }
- if (NarClassLoaders.FRAMEWORK_NAR_ID.equals(coordinate.getId())) {
- logger.debug("Requested to download {} but only a single NiFi
Framework NAR is allowed to exist so will not download.", coordinate);
+ if (!isDownloadable(coordinate)) {
+ logger.debug("Requested to download {} but only a single NAR
of this type is allowed to exist so will not download.", coordinate);
return null;
}
if (destinationFile.exists()) {
logger.debug("Requested to download {} but destination file {}
already exists. Will not download.", coordinate, destinationFile);
- return null;
+ return destinationFile;
}
for (final ExtensionClient extensionClient : clients) {
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java
index 1f9ddbf..173d7d9 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/extensions/FileSystemExtensionRepository.java
@@ -24,6 +24,7 @@ import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarLoadResult;
import org.apache.nifi.nar.NarUnpacker;
+import org.apache.nifi.stateless.engine.NarUnpackLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,13 +37,10 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
public class FileSystemExtensionRepository implements ExtensionRepository {
private static final Logger logger =
LoggerFactory.getLogger(FileSystemExtensionRepository.class);
- private static final Lock unpackLock = new ReentrantLock();
private final ExtensionDiscoveringManager extensionManager;
private final NarClassLoaders narClassLoaders;
@@ -91,6 +89,7 @@ public class FileSystemExtensionRepository implements
ExtensionRepository {
final DownloadQueue downloadQueue = new
DownloadQueue(extensionManager, executorService, concurrentDownloads,
bundleCoordinates, narLibDirectory, clients);
final CompletableFuture<Void> downloadFuture =
downloadQueue.download();
+ logger.info("Beginning download of extensions {}", bundleCoordinates);
final CompletableFuture<Set<Bundle>> loadFuture =
downloadFuture.thenApply(new Function<Void, Set<Bundle>>() {
@Override
@@ -116,12 +115,14 @@ public class FileSystemExtensionRepository implements
ExtensionRepository {
for (final File downloadedFile : downloadedFiles) {
// Use a statically defined Lock to prevent multiple threads from
unpacking their downloaded nars at the same time,
// even if they use a different ExtensionRepository.
- unpackLock.lock();
+ NarUnpackLock.lock();
try {
- final File unpackedDir = NarUnpacker.unpackNar(downloadedFile,
workingDirectory, false);
+ logger.info("Unpacking {}", downloadedFile);
+ final File extensionsWorkingDirectory = new
File(workingDirectory, "extensions");
+ final File unpackedDir = NarUnpacker.unpackNar(downloadedFile,
extensionsWorkingDirectory, false);
unpackedDirs.add(unpackedDir);
} finally {
- unpackLock.unlock();
+ NarUnpackLock.unlock();
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
index 19fc7b9..74673ea 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java
@@ -169,8 +169,7 @@ public class StandardStatelessEngine implements
StatelessEngine<VersionedFlowSna
final List<ReportingTaskNode> reportingTaskNodes =
createReportingTasks(dataflowDefinition);
final StandardStatelessFlow dataflow = new
StandardStatelessFlow(childGroup, reportingTaskNodes,
controllerServiceProvider, processContextFactory,
- repositoryContextFactory, dataflowDefinition,
stateManagerProvider);
- dataflow.initialize(processScheduler);
+ repositoryContextFactory, dataflowDefinition,
stateManagerProvider, processScheduler);
return dataflow;
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index 5832896..d462429 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -144,7 +144,7 @@ public class StandardStatelessDataflowFactory implements
StatelessDataflowFactor
extensionClients.add(extensionClient);
}
- final ExtensionRepository extensionRepository = new
FileSystemExtensionRepository(extensionManager,
engineConfiguration.getNarDirectory(),
engineConfiguration.getWorkingDirectory(),
+ final ExtensionRepository extensionRepository = new
FileSystemExtensionRepository(extensionManager,
engineConfiguration.getExtensionsDirectory(),
engineConfiguration.getWorkingDirectory(),
narClassLoaders, extensionClients);
final VariableRegistry variableRegistry =
VariableRegistry.EMPTY_REGISTRY;
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
index 672a79b..04ea9e6 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java
@@ -94,14 +94,14 @@ public class StandardStatelessFlow implements
StatelessDataflow {
private final DataflowDefinition<?> dataflowDefinition;
private final StatelessStateManagerProvider stateManagerProvider;
private final ObjectMapper objectMapper = new ObjectMapper();
+ private final ProcessScheduler processScheduler;
private volatile ExecutorService runDataflowExecutor;
- private volatile ProcessScheduler processScheduler;
private volatile boolean initialized = false;
public StandardStatelessFlow(final ProcessGroup rootGroup, final
List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider
controllerServiceProvider,
final ProcessContextFactory
processContextFactory, final RepositoryContextFactory repositoryContextFactory,
final DataflowDefinition<?> dataflowDefinition,
- final StatelessStateManagerProvider
stateManagerProvider) {
+ final StatelessStateManagerProvider
stateManagerProvider, final ProcessScheduler processScheduler) {
this.rootGroup = rootGroup;
this.allConnections = rootGroup.findAllConnections();
this.reportingTasks = reportingTasks;
@@ -110,6 +110,7 @@ public class StandardStatelessFlow implements
StatelessDataflow {
this.repositoryContextFactory = repositoryContextFactory;
this.dataflowDefinition = dataflowDefinition;
this.stateManagerProvider = stateManagerProvider;
+ this.processScheduler = processScheduler;
rootConnectables = new HashSet<>();
@@ -164,19 +165,24 @@ public class StandardStatelessFlow implements
StatelessDataflow {
}
}
- public void initialize(final ProcessScheduler processScheduler) {
+ @Override
+ public void initialize() {
if (initialized) {
- throw new IllegalStateException("Cannot initialize dataflow more
than once");
+ logger.debug("{} initialize() was called, but dataflow has already
been initialized. Returning without doing anything.", this);
+ return;
}
initialized = true;
- this.processScheduler = processScheduler;
// Trigger validation to occur so that components can be
enabled/started.
final long validationStart = System.currentTimeMillis();
- performValidation();
+ final StatelessDataflowValidation validationResult =
performValidation();
final long validationMillis = System.currentTimeMillis() -
validationStart;
+ if (!validationResult.isValid()) {
+ logger.warn("{} Attempting to initialize dataflow but found at
least one invalid component: {}", this, validationResult);
+ }
+
// Enable Controller Services and start processors in the flow.
// This is different than the calling ProcessGroup.startProcessing()
because
// that method triggers the behavior to happen in the background and
provides no way of knowing
@@ -256,7 +262,9 @@ public class StandardStatelessFlow implements
StatelessDataflow {
@Override
public void shutdown() {
- runDataflowExecutor.shutdown();
+ if (runDataflowExecutor != null) {
+ runDataflowExecutor.shutdown();
+ }
rootGroup.stopProcessing();
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
@@ -274,10 +282,7 @@ public class StandardStatelessFlow implements
StatelessDataflow {
// invoke any methods annotated with @OnShutdown on Reporting Tasks
reportingTasks.forEach(processScheduler::shutdownReportingTask);
- if (processScheduler != null) {
- processScheduler.shutdown();
- }
-
+ processScheduler.shutdown();
repositoryContextFactory.shutdown();
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
index 19c629e..772c874 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/test/java/org/apache/nifi/stateless/config/TestPropertiesFileFlowDefinitionParser.java
@@ -75,6 +75,11 @@ public class TestPropertiesFileFlowDefinitionParser {
}
@Override
+ public File getExtensionsDirectory() {
+ return null;
+ }
+
+ @Override
public File getKrb5File() {
return null;
}
diff --git
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
index 4fe6c76..268061b 100644
---
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
+++
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java
@@ -82,6 +82,11 @@ public class StatelessSystemIT {
}
@Override
+ public File getExtensionsDirectory() {
+ return new File("target/nifi-stateless-assembly/extensions");
+ }
+
+ @Override
public File getKrb5File() {
return new File("/etc/krb5.conf");
}