exceptionfactory commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r547261900



##########
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+import 
org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import 
org.apache.nifi.kafka.connect.validators.ConnectFileExistsOrUrlValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class StatelessKafkaConnectorUtil {
+    private static final Logger logger = 
LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
+    private static final Lock unpackNarLock = new ReentrantLock();
+
+    static final String NAR_DIRECTORY = "nar.directory";
+    static final String WORKING_DIRECTORY = "working.directory";
+    static final String FLOW_SNAPSHOT = "flow.snapshot";
+    static final String KRB5_FILE = "krb5.file";
+    static final String NEXUS_BASE_URL = "nexus.url";
+    static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+
+    static final String TRUSTSTORE_FILE = "security.truststore";
+    static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    static final String KEYSTORE_FILE = "security.keystore";
+    static final String KEYSTORE_TYPE = "security.keystoreType";
+    static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    static final String KEY_PASSWORD = "security.keyPasswd";
+
+    static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    static final File DEFAULT_WORKING_DIRECTORY = new 
File("/tmp/nifi-stateless-working");
+
+    private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = 
Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
+    private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = 
Pattern.compile("parameter\\.(.*?):(.*)");
+    private static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = 
Pattern.compile("parameter\\.(.*)");
+
+    public static void addCommonConfigElements(final ConfigDef configDef) {
+        configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new 
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the directory that stores the NiFi Archives (NARs)");
+        configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new 
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the temporary working directory for expanding NiFi 
Archives (NARs)");
+        configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new 
ConnectFileExistsOrUrlValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the file containing the dataflow to run");
+
+        configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, 
ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, 
ConfigDef.Importance.MEDIUM,
+            "Specifies the krb5.conf file to use if connecting to 
Kerberos-enabled services");
+        configDef.define(StatelessKafkaConnectorUtil.NEXUS_BASE_URL, 
ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), 
ConfigDef.Importance.MEDIUM,
+            "Specifies the Base URL of the Nexus instance to source extensions 
from");
+
+        configDef.define(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT, 
ConfigDef.Importance.MEDIUM,
+            "Specifies the amount of time to wait for the dataflow to finish 
processing input before considering the dataflow a failure");
+
+        configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "Filename of the keystore that Stateless NiFi should use for 
connecting to NiFi Registry and for Site-to-Site communications.");
+        configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "The type of the Keystore file. Either JKS or PKCS12.");
+        configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the keystore.");
+        configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the key in the keystore. If not provided, the 
password is assumed to be the same as the keystore password.");
+        configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "Filename of the truststore that Stateless NiFi should use for 
connecting to NiFi Registry and for Site-to-Site communications. If not 
specified, communications will occur only over " +
+                "http, not https.");
+        configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "The type of the Truststore file. Either JKS or PKCS12.");
+        configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the truststore.");
+    }
+
+    public static String getVersion() {
+        final File bootstrapJar = detectBootstrapJar();
+        if (bootstrapJar == null) {
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        try (final JarFile jarFile = new JarFile(bootstrapJar)) {
+            final Manifest manifest = jarFile.getManifest();
+            if (manifest != null) {
+                return 
manifest.getMainAttributes().getValue("Implementation-Version");
+            }
+        } catch (IOException e) {
+            logger.warn("Could not determine Version of NiFi Stateless Kafka 
Connector", e);
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        return "<Unable to Stateless NiFi Kafka Connector Version>";

Review comment:
       The method could be refactored to declare this string as the default 
return value, updated when the Manifest `Implementation-Version` is available, 
and then have a single return.

##########
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+import 
org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import 
org.apache.nifi.kafka.connect.validators.ConnectFileExistsOrUrlValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class StatelessKafkaConnectorUtil {
+    private static final Logger logger = 
LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
+    private static final Lock unpackNarLock = new ReentrantLock();
+
+    static final String NAR_DIRECTORY = "nar.directory";
+    static final String WORKING_DIRECTORY = "working.directory";
+    static final String FLOW_SNAPSHOT = "flow.snapshot";
+    static final String KRB5_FILE = "krb5.file";
+    static final String NEXUS_BASE_URL = "nexus.url";
+    static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+
+    static final String TRUSTSTORE_FILE = "security.truststore";
+    static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    static final String KEYSTORE_FILE = "security.keystore";
+    static final String KEYSTORE_TYPE = "security.keystoreType";
+    static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    static final String KEY_PASSWORD = "security.keyPasswd";
+
+    static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    static final File DEFAULT_WORKING_DIRECTORY = new 
File("/tmp/nifi-stateless-working");
+
+    private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = 
Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
+    private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = 
Pattern.compile("parameter\\.(.*?):(.*)");
+    private static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = 
Pattern.compile("parameter\\.(.*)");
+
+    public static void addCommonConfigElements(final ConfigDef configDef) {
+        configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new 
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the directory that stores the NiFi Archives (NARs)");
+        configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new 
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the temporary working directory for expanding NiFi 
Archives (NARs)");
+        configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new 
ConnectFileExistsOrUrlValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the file containing the dataflow to run");
+
+        configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, 
ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, 
ConfigDef.Importance.MEDIUM,
+            "Specifies the krb5.conf file to use if connecting to 
Kerberos-enabled services");
+        configDef.define(StatelessKafkaConnectorUtil.NEXUS_BASE_URL, 
ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), 
ConfigDef.Importance.MEDIUM,
+            "Specifies the Base URL of the Nexus instance to source extensions 
from");
+
+        configDef.define(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT, 
ConfigDef.Importance.MEDIUM,
+            "Specifies the amount of time to wait for the dataflow to finish 
processing input before considering the dataflow a failure");
+
+        configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "Filename of the keystore that Stateless NiFi should use for 
connecting to NiFi Registry and for Site-to-Site communications.");
+        configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "The type of the Keystore file. Either JKS or PKCS12.");
+        configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the keystore.");
+        configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the key in the keystore. If not provided, the 
password is assumed to be the same as the keystore password.");
+        configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "Filename of the truststore that Stateless NiFi should use for 
connecting to NiFi Registry and for Site-to-Site communications. If not 
specified, communications will occur only over " +
+                "http, not https.");
+        configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "The type of the Truststore file. Either JKS or PKCS12.");
+        configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the truststore.");
+    }
+
+    public static String getVersion() {
+        final File bootstrapJar = detectBootstrapJar();
+        if (bootstrapJar == null) {
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        try (final JarFile jarFile = new JarFile(bootstrapJar)) {
+            final Manifest manifest = jarFile.getManifest();
+            if (manifest != null) {
+                return 
manifest.getMainAttributes().getValue("Implementation-Version");
+            }
+        } catch (IOException e) {
+            logger.warn("Could not determine Version of NiFi Stateless Kafka 
Connector", e);
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        return "<Unable to Stateless NiFi Kafka Connector Version>";
+    }
+
+    public static StatelessDataflow createDataflow(final Map<String, String> 
properties) {
+        final StatelessEngineConfiguration engineConfiguration = 
createEngineConfiguration(properties);
+        final String configuredFlowSnapshot = properties.get(FLOW_SNAPSHOT);
+
+        final List<ParameterOverride> parameterOverrides = 
parseParameterOverrides(properties);
+        final String dataflowName = properties.get("name");
+
+        final DataflowDefinition<?> dataflowDefinition;
+        final StatelessBootstrap bootstrap;
+        try {
+            final Map<String, String> dataflowDefinitionProperties = new 
HashMap<>();
+
+            if (configuredFlowSnapshot.startsWith("http://";) || 
configuredFlowSnapshot.startsWith("https://";)) {
+                
dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.url", 
configuredFlowSnapshot);

Review comment:
       Should this property key be declared as a static variable?

##########
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSinkTask extends SinkTask {
+    private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSinkTask.class);
+
+    private StatelessDataflow dataflow;
+    private String inputPortName;
+    private Set<String> failurePortNames;
+    private long timeoutMillis;
+    private Pattern headerNameRegex;
+    private String headerNamePrefix;
+    private int batchSize;
+    private long batchBytes;
+    private QueueSize queueSize;
+    private String dataflowName;
+
+    private long backoffMillis = 0L;
+
+    @Override
+    public String version() {
+        return StatelessKafkaConnectorUtil.getVersion();
+    }
+
+    @Override
+    public void start(final Map<String, String> properties) {
+        logger.info("Starting Sink Task with properties {}", properties);
+
+        final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+        timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+        dataflowName = properties.get("name");

Review comment:
       The `name` property is used in several places and could be declared as a 
static variable.

##########
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+import 
org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import 
org.apache.nifi.kafka.connect.validators.ConnectFileExistsOrUrlValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class StatelessKafkaConnectorUtil {
+    private static final Logger logger = 
LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
+    private static final Lock unpackNarLock = new ReentrantLock();
+
+    static final String NAR_DIRECTORY = "nar.directory";
+    static final String WORKING_DIRECTORY = "working.directory";
+    static final String FLOW_SNAPSHOT = "flow.snapshot";
+    static final String KRB5_FILE = "krb5.file";
+    static final String NEXUS_BASE_URL = "nexus.url";
+    static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+
+    static final String TRUSTSTORE_FILE = "security.truststore";
+    static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    static final String KEYSTORE_FILE = "security.keystore";
+    static final String KEYSTORE_TYPE = "security.keystoreType";
+    static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    static final String KEY_PASSWORD = "security.keyPasswd";
+
+    static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    static final File DEFAULT_WORKING_DIRECTORY = new 
File("/tmp/nifi-stateless-working");
+
+    private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = 
Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
+    private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = 
Pattern.compile("parameter\\.(.*?):(.*)");
+    private static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = 
Pattern.compile("parameter\\.(.*)");
+
+    public static void addCommonConfigElements(final ConfigDef configDef) {
+        configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new 
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the directory that stores the NiFi Archives (NARs)");
+        configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new 
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the temporary working directory for expanding NiFi 
Archives (NARs)");
+        configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new 
ConnectFileExistsOrUrlValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the file containing the dataflow to run");
+
+        configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, 
ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, 
ConfigDef.Importance.MEDIUM,
+            "Specifies the krb5.conf file to use if connecting to 
Kerberos-enabled services");
+        configDef.define(StatelessKafkaConnectorUtil.NEXUS_BASE_URL, 
ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), 
ConfigDef.Importance.MEDIUM,
+            "Specifies the Base URL of the Nexus instance to source extensions 
from");
+
+        configDef.define(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT, 
ConfigDef.Importance.MEDIUM,
+            "Specifies the amount of time to wait for the dataflow to finish 
processing input before considering the dataflow a failure");
+
+        configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "Filename of the keystore that Stateless NiFi should use for 
connecting to NiFi Registry and for Site-to-Site communications.");
+        configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "The type of the Keystore file. Either JKS or PKCS12.");
+        configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the keystore.");
+        configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the key in the keystore. If not provided, the 
password is assumed to be the same as the keystore password.");
+        configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "Filename of the truststore that Stateless NiFi should use for 
connecting to NiFi Registry and for Site-to-Site communications. If not 
specified, communications will occur only over " +
+                "http, not https.");
+        configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "The type of the Truststore file. Either JKS or PKCS12.");
+        configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the truststore.");
+    }
+
+    public static String getVersion() {
+        final File bootstrapJar = detectBootstrapJar();
+        if (bootstrapJar == null) {
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        try (final JarFile jarFile = new JarFile(bootstrapJar)) {
+            final Manifest manifest = jarFile.getManifest();
+            if (manifest != null) {
+                return 
manifest.getMainAttributes().getValue("Implementation-Version");
+            }
+        } catch (IOException e) {
+            logger.warn("Could not determine Version of NiFi Stateless Kafka 
Connector", e);
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        return "<Unable to Stateless NiFi Kafka Connector Version>";
+    }
+
+    public static StatelessDataflow createDataflow(final Map<String, String> 
properties) {
+        final StatelessEngineConfiguration engineConfiguration = 
createEngineConfiguration(properties);
+        final String configuredFlowSnapshot = properties.get(FLOW_SNAPSHOT);
+
+        final List<ParameterOverride> parameterOverrides = 
parseParameterOverrides(properties);
+        final String dataflowName = properties.get("name");
+
+        final DataflowDefinition<?> dataflowDefinition;
+        final StatelessBootstrap bootstrap;
+        try {
+            final Map<String, String> dataflowDefinitionProperties = new 
HashMap<>();
+
+            if (configuredFlowSnapshot.startsWith("http://";) || 
configuredFlowSnapshot.startsWith("https://";)) {
+                
dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.url", 
configuredFlowSnapshot);
+            } else {
+                final File flowSnapshotFile = new File(configuredFlowSnapshot);
+                
dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.file", 
flowSnapshotFile.getAbsolutePath());
+            }
+
+            dataflowDefinitionProperties.put("nifi.stateless.flow.name", 
dataflowName);
+
+            MDC.setContextMap(Collections.singletonMap("dataflow", 
dataflowName));
+
+            // Use a Write Lock to ensure that only a single thread is calling 
StatelessBootstrap.bootstrap().
+            // We do this because the bootstrap() method will expand all NAR 
files into the working directory.
+            // If we have multiple Connector instances, or multiple tasks, we 
don't want several threads all
+            // unpacking NARs at the same time, as it could potentially result 
in the working directory becoming corrupted.
+            unpackNarLock.lock();
+            try {
+                bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, 
StatelessNiFiSourceTask.class.getClassLoader());
+            } finally {
+                unpackNarLock.unlock();
+            }
+
+            dataflowDefinition = 
bootstrap.parseDataflowDefinition(dataflowDefinitionProperties);
+            return bootstrap.createDataflow(dataflowDefinition, 
parameterOverrides);
+        } catch (final Exception e) {
+            throw new RuntimeException("Failed to bootstrap Stateless NiFi 
Engine", e);
+        }
+    }
+
+    private static List<ParameterOverride> parseParameterOverrides(final 
Map<String, String> properties) {
+        final List<ParameterOverride> parameterOverrides = new ArrayList<>();
+
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            final String parameterValue = entry.getValue();
+
+            ParameterOverride parameterOverride = null;
+            final Matcher matcher = 
PARAMETER_WITH_CONTEXT_PATTERN.matcher(entry.getKey());
+            if (matcher.matches()) {
+                final String contextName = matcher.group(1);
+                final String parameterName = matcher.group(2);
+                parameterOverride = new ParameterOverride(contextName, 
parameterName, parameterValue);
+            } else {
+                final Matcher noContextMatcher = 
PARAMETER_WITHOUT_CONTEXT_PATTERN.matcher(entry.getKey());
+                if (noContextMatcher.matches()) {
+                    final String parameterName = noContextMatcher.group(1);
+                    parameterOverride = new ParameterOverride(parameterName, 
parameterValue);
+                }
+            }
+
+            if (parameterOverride != null) {
+                parameterOverrides.add(parameterOverride);
+            }
+        }
+
+        return parameterOverrides;
+    }
+
+    private static StatelessEngineConfiguration 
createEngineConfiguration(final Map<String, String> properties) {
+        final File narDirectory;
+        final String narDirectoryFilename = properties.get(NAR_DIRECTORY);
+        if (narDirectoryFilename == null) {
+            narDirectory = detectNarDirectory();
+        } else {
+            narDirectory = new File(narDirectoryFilename);
+        }
+
+        final File workingDirectory;
+        final String workingDirectoryFilename = 
properties.get(WORKING_DIRECTORY);
+        if (workingDirectoryFilename == null) {
+            workingDirectory = DEFAULT_WORKING_DIRECTORY;
+        } else {
+            workingDirectory = new File(workingDirectoryFilename);
+        }
+
+        final SslContextDefinition sslContextDefinition = 
createSslContextDefinition(properties);
+
+        final StatelessEngineConfiguration engineConfiguration = new 
StatelessEngineConfiguration() {
+            @Override
+            public File getWorkingDirectory() {
+                return workingDirectory;
+            }
+
+            @Override
+            public File getNarDirectory() {
+                return narDirectory;
+            }
+
+            @Override
+            public File getKrb5File() {
+                return new File(properties.getOrDefault(KRB5_FILE, 
DEFAULT_KRB5_FILE));
+            }
+
+            @Override
+            public SslContextDefinition getSslContext() {
+                return sslContextDefinition;
+            }
+
+            @Override
+            public String getSensitivePropsKey() {
+                return "nifi-stateless";
+            }
+
+            @Override
+            public List<ExtensionClientDefinition> getExtensionClients() {
+                final List<ExtensionClientDefinition> 
extensionClientDefinitions = new ArrayList<>();
+
+                final String nexusBaseUrl = properties.get(NEXUS_BASE_URL);
+                if (nexusBaseUrl != null) {
+                    final ExtensionClientDefinition definition = new 
ExtensionClientDefinition();
+                    definition.setUseSslContext(false);
+                    definition.setExtensionClientType("nexus");
+                    definition.setCommsTimeout("30 secs");
+                    definition.setBaseUrl(nexusBaseUrl);
+                    extensionClientDefinitions.add(definition);
+                }
+
+                return extensionClientDefinitions;
+            }
+        };
+
+        return engineConfiguration;
+    }
+
+    private static SslContextDefinition createSslContextDefinition(final 
Map<String, String> properties) {
+        final String truststoreFile = properties.get(TRUSTSTORE_FILE);
+        if (truststoreFile == null || truststoreFile.trim().isEmpty()) {
+            return null;
+        }
+
+        final SslContextDefinition sslContextDefinition;
+        sslContextDefinition = new SslContextDefinition();
+        sslContextDefinition.setTruststoreFile(truststoreFile);
+        
sslContextDefinition.setTruststorePass(properties.get(TRUSTSTORE_PASSWORD));
+        
sslContextDefinition.setTruststoreType(properties.get(TRUSTSTORE_TYPE));
+
+        final String keystoreFile = properties.get(KEYSTORE_FILE);
+        if (keystoreFile != null && !keystoreFile.trim().isEmpty()) {
+            sslContextDefinition.setKeystoreFile(keystoreFile);
+            
sslContextDefinition.setKeystoreType(properties.get(KEYSTORE_TYPE));
+
+            final String keystorePass = properties.get(KEYSTORE_PASSWORD);
+            sslContextDefinition.setKeystorePass(keystorePass);
+
+            final String explicitKeyPass = properties.get(KEY_PASSWORD);
+            final String keyPass = (explicitKeyPass == null || 
explicitKeyPass.trim().isEmpty()) ? keystorePass : explicitKeyPass;
+            sslContextDefinition.setKeyPass(keyPass);
+        }
+
+        return sslContextDefinition;
+    }
+
+    private static URLClassLoader getConnectClassLoader() {
+        final ClassLoader classLoader = 
StatelessKafkaConnectorUtil.class.getClassLoader();
+        if (!(classLoader instanceof URLClassLoader)) {
+            throw new IllegalStateException("No configuration value was set 
for the " + NAR_DIRECTORY + " configuration property, and was unable to 
determine the NAR directory automatically");
+        }
+
+        return (URLClassLoader) classLoader;
+    }
+
+    private static File detectBootstrapJar() {
+        final URLClassLoader urlClassLoader = getConnectClassLoader();
+        for (final URL url : urlClassLoader.getURLs()) {
+            final String artifactFilename = url.getFile();
+            if (artifactFilename == null) {
+                continue;
+            }
+
+            final File artifactFile = new File(artifactFilename);
+            if 
(STATELESS_BOOTSTRAP_FILE_PATTERN.matcher(artifactFile.getName()).matches()) {
+                return artifactFile;
+            }
+        }
+
+        return null;
+    }
+
+    private static File detectNarDirectory() {
+        final File bootstrapJar = detectBootstrapJar();
+        if (bootstrapJar == null) {
+            final URLClassLoader urlClassLoader = getConnectClassLoader();
+            logger.error("ClassLoader that loaded Stateless Kafka Connector 
did not contain nifi-stateless-bootstrap. URLs that were present: {}", 
Arrays.asList(urlClassLoader.getURLs()));
+            throw new IllegalStateException("No configuration value was set 
for the " + NAR_DIRECTORY + " configuration property, and was unable to 
determine the NAR directory automatically");

Review comment:
       This message appears to be the same as the one used in 
`getConnectClassLoader`.  The message could be declared once, or this message 
could be adjusted.

##########
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java
##########
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.common.config.ConfigDef;
+import 
org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
+import 
org.apache.nifi.kafka.connect.validators.ConnectFileExistsOrUrlValidator;
+import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
+import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.flow.DataflowDefinition;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class StatelessKafkaConnectorUtil {
+    private static final Logger logger = 
LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
+    private static final Lock unpackNarLock = new ReentrantLock();
+
+    static final String NAR_DIRECTORY = "nar.directory";
+    static final String WORKING_DIRECTORY = "working.directory";
+    static final String FLOW_SNAPSHOT = "flow.snapshot";
+    static final String KRB5_FILE = "krb5.file";
+    static final String NEXUS_BASE_URL = "nexus.url";
+    static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
+
+    static final String TRUSTSTORE_FILE = "security.truststore";
+    static final String TRUSTSTORE_TYPE = "security.truststoreType";
+    static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
+    static final String KEYSTORE_FILE = "security.keystore";
+    static final String KEYSTORE_TYPE = "security.keystoreType";
+    static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
+    static final String KEY_PASSWORD = "security.keyPasswd";
+
+    static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
+    static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
+    static final File DEFAULT_WORKING_DIRECTORY = new 
File("/tmp/nifi-stateless-working");
+
+    private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = 
Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
+    private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = 
Pattern.compile("parameter\\.(.*?):(.*)");
+    private static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = 
Pattern.compile("parameter\\.(.*)");
+
+    public static void addCommonConfigElements(final ConfigDef configDef) {
+        configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new 
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the directory that stores the NiFi Archives (NARs)");
+        configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, new 
ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the temporary working directory for expanding NiFi 
Archives (NARs)");
+        configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new 
ConnectFileExistsOrUrlValidator(), ConfigDef.Importance.HIGH,
+            "Specifies the file containing the dataflow to run");
+
+        configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, 
ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, 
ConfigDef.Importance.MEDIUM,
+            "Specifies the krb5.conf file to use if connecting to 
Kerberos-enabled services");
+        configDef.define(StatelessKafkaConnectorUtil.NEXUS_BASE_URL, 
ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), 
ConfigDef.Importance.MEDIUM,
+            "Specifies the Base URL of the Nexus instance to source extensions 
from");
+
+        configDef.define(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT, 
ConfigDef.Importance.MEDIUM,
+            "Specifies the amount of time to wait for the dataflow to finish 
processing input before considering the dataflow a failure");
+
+        configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "Filename of the keystore that Stateless NiFi should use for 
connecting to NiFi Registry and for Site-to-Site communications.");
+        configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "The type of the Keystore file. Either JKS or PKCS12.");
+        configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the keystore.");
+        configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the key in the keystore. If not provided, the 
password is assumed to be the same as the keystore password.");
+        configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "Filename of the truststore that Stateless NiFi should use for 
connecting to NiFi Registry and for Site-to-Site communications. If not 
specified, communications will occur only over " +
+                "http, not https.");
+        configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.MEDIUM,
+            "The type of the Truststore file. Either JKS or PKCS12.");
+        configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, 
ConfigDef.Importance.MEDIUM,
+            "The password for the truststore.");
+    }
+
+    public static String getVersion() {
+        final File bootstrapJar = detectBootstrapJar();
+        if (bootstrapJar == null) {
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        try (final JarFile jarFile = new JarFile(bootstrapJar)) {
+            final Manifest manifest = jarFile.getManifest();
+            if (manifest != null) {
+                return 
manifest.getMainAttributes().getValue("Implementation-Version");
+            }
+        } catch (IOException e) {
+            logger.warn("Could not determine Version of NiFi Stateless Kafka 
Connector", e);
+            return "<Unable to Stateless NiFi Kafka Connector Version>";
+        }
+
+        return "<Unable to Stateless NiFi Kafka Connector Version>";
+    }
+
+    public static StatelessDataflow createDataflow(final Map<String, String> 
properties) {
+        final StatelessEngineConfiguration engineConfiguration = 
createEngineConfiguration(properties);
+        final String configuredFlowSnapshot = properties.get(FLOW_SNAPSHOT);
+
+        final List<ParameterOverride> parameterOverrides = 
parseParameterOverrides(properties);
+        final String dataflowName = properties.get("name");
+
+        final DataflowDefinition<?> dataflowDefinition;
+        final StatelessBootstrap bootstrap;
+        try {
+            final Map<String, String> dataflowDefinitionProperties = new 
HashMap<>();
+
+            if (configuredFlowSnapshot.startsWith("http://";) || 
configuredFlowSnapshot.startsWith("https://";)) {
+                
dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.url", 
configuredFlowSnapshot);
+            } else {
+                final File flowSnapshotFile = new File(configuredFlowSnapshot);
+                
dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.file", 
flowSnapshotFile.getAbsolutePath());
+            }
+
+            dataflowDefinitionProperties.put("nifi.stateless.flow.name", 
dataflowName);
+
+            MDC.setContextMap(Collections.singletonMap("dataflow", 
dataflowName));
+
+            // Use a Write Lock to ensure that only a single thread is calling 
StatelessBootstrap.bootstrap().
+            // We do this because the bootstrap() method will expand all NAR 
files into the working directory.
+            // If we have multiple Connector instances, or multiple tasks, we 
don't want several threads all
+            // unpacking NARs at the same time, as it could potentially result 
in the working directory becoming corrupted.
+            unpackNarLock.lock();
+            try {
+                bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, 
StatelessNiFiSourceTask.class.getClassLoader());
+            } finally {
+                unpackNarLock.unlock();
+            }
+
+            dataflowDefinition = 
bootstrap.parseDataflowDefinition(dataflowDefinitionProperties);
+            return bootstrap.createDataflow(dataflowDefinition, 
parameterOverrides);
+        } catch (final Exception e) {
+            throw new RuntimeException("Failed to bootstrap Stateless NiFi 
Engine", e);
+        }
+    }
+
+    private static List<ParameterOverride> parseParameterOverrides(final 
Map<String, String> properties) {
+        final List<ParameterOverride> parameterOverrides = new ArrayList<>();
+
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            final String parameterValue = entry.getValue();
+
+            ParameterOverride parameterOverride = null;
+            final Matcher matcher = 
PARAMETER_WITH_CONTEXT_PATTERN.matcher(entry.getKey());
+            if (matcher.matches()) {
+                final String contextName = matcher.group(1);
+                final String parameterName = matcher.group(2);
+                parameterOverride = new ParameterOverride(contextName, 
parameterName, parameterValue);
+            } else {
+                final Matcher noContextMatcher = 
PARAMETER_WITHOUT_CONTEXT_PATTERN.matcher(entry.getKey());
+                if (noContextMatcher.matches()) {
+                    final String parameterName = noContextMatcher.group(1);
+                    parameterOverride = new ParameterOverride(parameterName, 
parameterValue);
+                }
+            }
+
+            if (parameterOverride != null) {
+                parameterOverrides.add(parameterOverride);
+            }
+        }
+
+        return parameterOverrides;
+    }
+
+    private static StatelessEngineConfiguration 
createEngineConfiguration(final Map<String, String> properties) {
+        final File narDirectory;
+        final String narDirectoryFilename = properties.get(NAR_DIRECTORY);
+        if (narDirectoryFilename == null) {
+            narDirectory = detectNarDirectory();
+        } else {
+            narDirectory = new File(narDirectoryFilename);
+        }
+
+        final File workingDirectory;
+        final String workingDirectoryFilename = 
properties.get(WORKING_DIRECTORY);
+        if (workingDirectoryFilename == null) {
+            workingDirectory = DEFAULT_WORKING_DIRECTORY;
+        } else {
+            workingDirectory = new File(workingDirectoryFilename);
+        }
+
+        final SslContextDefinition sslContextDefinition = 
createSslContextDefinition(properties);
+
+        final StatelessEngineConfiguration engineConfiguration = new 
StatelessEngineConfiguration() {
+            @Override
+            public File getWorkingDirectory() {
+                return workingDirectory;
+            }
+
+            @Override
+            public File getNarDirectory() {
+                return narDirectory;
+            }
+
+            @Override
+            public File getKrb5File() {
+                return new File(properties.getOrDefault(KRB5_FILE, 
DEFAULT_KRB5_FILE));
+            }
+
+            @Override
+            public SslContextDefinition getSslContext() {
+                return sslContextDefinition;
+            }
+
+            @Override
+            public String getSensitivePropsKey() {
+                return "nifi-stateless";

Review comment:
       Should it be possible to override this value using configuration 
properties?

##########
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSourceTask extends SourceTask {
+    private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
+
+    private StatelessDataflow dataflow;
+    private String outputPortName;
+    private String topicName;
+    private String topicNameAttribute;
+    private TriggerResult triggerResult;
+    private String keyAttributeName;
+    private Pattern headerAttributeNamePattern;
+    private long timeoutMillis;
+    private String dataflowName;
+
+    private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
+
+    @Override
+    public String version() {
+        return StatelessKafkaConnectorUtil.getVersion();
+    }
+
+    @Override
+    public void start(final Map<String, String> properties) {
+        logger.info("Starting Source Task with properties {}", properties);

Review comment:
       See comment on logging properties for the Sink Task.

##########
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSinkTask extends SinkTask {
+    private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSinkTask.class);
+
+    private StatelessDataflow dataflow;
+    private String inputPortName;
+    private Set<String> failurePortNames;
+    private long timeoutMillis;
+    private Pattern headerNameRegex;
+    private String headerNamePrefix;
+    private int batchSize;
+    private long batchBytes;
+    private QueueSize queueSize;
+    private String dataflowName;
+
+    private long backoffMillis = 0L;
+
+    @Override
+    public String version() {
+        return StatelessKafkaConnectorUtil.getVersion();
+    }
+
+    @Override
+    public void start(final Map<String, String> properties) {
+        logger.info("Starting Sink Task with properties {}", properties);
+
+        final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+        timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+        dataflowName = properties.get("name");
+
+        final String regex = 
properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX);
+        headerNameRegex = regex == null ? null : Pattern.compile(regex);
+        headerNamePrefix = 
properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX,
 "");
+
+        batchSize = 
Integer.parseInt(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_COUNT,
 "0"));
+        batchBytes = 
Long.parseLong(properties.getOrDefault(StatelessNiFiSinkConnector.BATCH_SIZE_BYTES,
 "0"));
+
+        dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+
+        // Determine input port name. If input port is explicitly set, use the 
value given. Otherwise, if only one port exists, use that. Otherwise, throw 
ConfigException.
+        final String dataflowName = properties.get("name");
+        inputPortName = 
properties.get(StatelessNiFiSinkConnector.INPUT_PORT_NAME);
+        if (inputPortName == null) {
+            final Set<String> inputPorts = dataflow.getInputPortNames();
+            if (inputPorts.isEmpty()) {
+                throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> does not have an Input Port at the root level. Dataflows used 
for a Kafka Connect Sink Task "
+                    + "must have at least one Input Port at the root level.");
+            }
+
+            if (inputPorts.size() > 1) {
+                throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> has multiple Input Ports at the root level (" + 
inputPorts.toString()
+                    + "). The " + StatelessNiFiSinkConnector.INPUT_PORT_NAME + 
" property must be set to indicate which of these Ports Kafka records should be 
sent to.");
+            }
+
+            inputPortName = inputPorts.iterator().next();
+        }
+
+        // Validate the input port
+        if (!dataflow.getInputPortNames().contains(inputPortName)) {
+            throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> does not have Input Port with name <" + inputPortName + "> at 
the root level. Existing Input Port names are "
+                + dataflow.getInputPortNames());
+        }
+
+        // Determine the failure Ports, if any are given.
+        final String failurePortList = 
properties.get(StatelessNiFiSinkConnector.FAILURE_PORTS);
+        if (failurePortList == null || failurePortList.trim().isEmpty()) {
+            failurePortNames = Collections.emptySet();
+        } else {
+            failurePortNames = new HashSet<>();
+
+            final String[] names = failurePortList.split(",");
+            for (final String name : names) {
+                final String trimmed = name.trim();
+                failurePortNames.add(trimmed);
+            }
+        }
+
+        // Validate the failure ports
+        final Set<String> outputPortNames = dataflow.getOutputPortNames();
+        for (final String failurePortName : failurePortNames) {
+            if (!outputPortNames.contains(failurePortName)) {
+                throw new ConfigException("Dataflow was configured with a 
Failure Port of " + failurePortName
+                    + " but there is no Port with that name in the dataflow. 
Valid Port names are " + outputPortNames);
+            }
+        }
+    }
+
+    @Override
+    public void put(final Collection<SinkRecord> records) {
+        if (backoffMillis > 0) {
+            logger.debug("Due to previous failure, will wait {} millis before 
executing dataflow", backoffMillis);
+
+            try {
+                Thread.sleep(backoffMillis);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("Interrupted while waiting to 
enqueue data", ie);
+            }
+        }
+
+        logger.debug("Enqueuing {} Kafka messages", records.size());
+
+        for (final SinkRecord record : records) {
+            final Map<String, String> attributes = createAttributes(record);
+            final byte[] contents = getContents(record.value());
+
+            queueSize = dataflow.enqueue(contents, attributes, inputPortName);
+        }
+
+        if (queueSize == null || queueSize.getObjectCount() < batchSize) {
+            return;
+        }
+        if (queueSize.getByteCount() < batchBytes) {
+            return;
+        }
+
+        logger.debug("Triggering dataflow");
+
+        try {
+            triggerDataflow();
+            resetBackoff();
+        } catch (final RetriableException re) {
+            backoff();
+            throw re;
+        }
+    }
+
+    private void backoff() {
+        // If no backoff period has been set, set it to 1 second. Otherwise, 
double the amount of time to backoff, up to 10 seconds.
+        if (backoffMillis == 0L) {
+            backoffMillis = 1000L;
+        }
+
+        backoffMillis = Math.min(backoffMillis * 2, 10_000L);
+    }
+
+    private void resetBackoff() {
+        backoffMillis = 0L;
+    }
+
+    private void triggerDataflow() {
+        final long start = System.nanoTime();
+        while (dataflow.isFlowFileQueued()) {
+            final DataflowTrigger trigger = dataflow.trigger();
+
+            try {
+                final Optional<TriggerResult> resultOptional = 
trigger.getResult(timeoutMillis, TimeUnit.MILLISECONDS);
+                if (resultOptional.isPresent()) {
+                    final TriggerResult result = resultOptional.get();
+
+                    if (result.isSuccessful()) {
+                        // Verify that data was only transferred to the 
expected Input Port
+                        verifyOutputPortContents(trigger, result);
+
+                        // Acknowledge the data so that the session can be 
committed
+                        result.acknowledge();
+                    } else {
+                        logger.error("Dataflow {} failed to execute properly", 
dataflowName, result.getFailureCause().orElse(null));
+                        trigger.cancel();
+                        throw new RetriableException("Dataflow failed to 
execute properly", result.getFailureCause().orElse(null));
+                    }
+                } else {
+                    trigger.cancel();
+                    throw new RetriableException("Timed out waiting for the 
dataflow to complete");
+                }
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("Interrupted while waiting for 
dataflow to complete", e);
+            }
+        }
+
+        final long nanos = System.nanoTime() - start;
+        logger.debug("Ran dataflow with {} messages ({}) in {} nanos", 
queueSize.getObjectCount(), 
FormatUtils.formatDataSize(queueSize.getByteCount()), nanos);
+    }
+
+    private void verifyOutputPortContents(final DataflowTrigger trigger, final 
TriggerResult result) {
+        for (final String failurePort : failurePortNames) {
+            final List<FlowFile> flowFiles = 
result.getOutputFlowFiles(failurePort);
+            if (flowFiles != null && !flowFiles.isEmpty()) {
+                logger.error("Dataflow transferred FlowFiles to Port {}, which 
is configured as a Failure Port. Rolling back session.", failurePort);
+                trigger.cancel();
+                throw new RetriableException("Data was transferred to Failure 
Port " + failurePort);
+            }
+        }
+    }
+
+    @Override
+    public void flush(final Map<TopicPartition, OffsetAndMetadata> 
currentOffsets) {
+        super.flush(currentOffsets);
+
+        if (queueSize != null && queueSize.getObjectCount() > 0) {
+            triggerDataflow();
+        }
+    }
+
+    private byte[] getContents(final Object value) {
+        if (value == null) {
+            return new byte[0];
+        }
+        if (value instanceof String) {
+            return ((String) value).getBytes(StandardCharsets.UTF_8);
+        }
+        if (value instanceof byte[]) {
+            return (byte[]) value;
+        }
+
+        throw new IllegalArgumentException("Unsupported message type: the 
Message value was " + value + " but was expected to be a byte array or a 
String");
+    }
+
+    private Map<String, String> createAttributes(final SinkRecord record) {
+        final Map<String, String> attributes = new HashMap<>(8);

Review comment:
       Is there a particular reason for declaring an initial size for the 
HashMap?

##########
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSinkTask.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSinkTask extends SinkTask {
+    private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSinkTask.class);
+
+    private StatelessDataflow dataflow;
+    private String inputPortName;
+    private Set<String> failurePortNames;
+    private long timeoutMillis;
+    private Pattern headerNameRegex;
+    private String headerNamePrefix;
+    private int batchSize;
+    private long batchBytes;
+    private QueueSize queueSize;
+    private String dataflowName;
+
+    private long backoffMillis = 0L;
+
+    @Override
+    public String version() {
+        return StatelessKafkaConnectorUtil.getVersion();
+    }
+
+    @Override
+    public void start(final Map<String, String> properties) {
+        logger.info("Starting Sink Task with properties {}", properties);

Review comment:
       Logging all properties will include store passwords and other 
potentially sensitive parameters.  Perhaps logging a subset of the standard 
parameters would be a safer approach to avoid writing sensitive information to 
logs.  Otherwise, some type of filtering should be implemented prior to passing 
properties for logging.

##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/config/PropertiesFileFlowDefinitionParser.java
##########
@@ -262,40 +282,93 @@ private void warnOnWhitespace(final Map<String, String> 
properties, final String
         return properties;
     }
 
-    private VersionedFlowSnapshot fetchVersionedFlowSnapshot(final Map<String, 
String> properties, final File propertiesFile, final SSLContext sslContext)
+    private VersionedFlowSnapshot fetchVersionedFlowSnapshot(final Map<String, 
String> properties, final SslContextDefinition sslContextDefinition)
         throws IOException, StatelessConfigurationException {
 
         final String flowSnapshotFilename = 
properties.get(FLOW_SNAPSHOT_FILE_KEY);
-        if (flowSnapshotFilename == null || 
flowSnapshotFilename.trim().isEmpty()) {
-            final String registryUrl = properties.get(REGISTRY_URL_KEY);
-            final String bucketId = properties.get(BUCKET_ID_KEY);
-            final String flowId = properties.get(FLOW_ID_KEY);
-            final String flowVersionValue = properties.get(FLOW_VERSION_KEY);
-            final Integer flowVersion;
+        if (flowSnapshotFilename != null && 
!flowSnapshotFilename.trim().isEmpty()) {
+            final File flowSnapshotFile = new 
File(flowSnapshotFilename.trim());
             try {
-                flowVersion = flowVersionValue == null || 
flowVersionValue.trim().isEmpty() ? null : Integer.parseInt(flowVersionValue);
-            } catch (final NumberFormatException nfe) {
-                throw new StatelessConfigurationException("The " + 
FLOW_VERSION_KEY + " property in " + propertiesFile.getAbsolutePath()
-                    + " was expected to contain a number but had a value of " 
+ flowVersionValue);
+                return readVersionedFlowSnapshot(flowSnapshotFile);
+            } catch (final Exception e) {
+                throw new IOException("Configuration indicates that the flow 
to run is located at " + flowSnapshotFilename
+                    + " but failed to load dataflow from that location", e);
             }
+        }
 
-            if (registryUrl == null || bucketId == null || flowId == null) {
-                throw new IllegalArgumentException("Specified configuration 
file " + propertiesFile + " does not provide the filename of the flow to run or 
the registryUrl, bucketId, and flowId.");
-            }
+        final String flowSnapshotUrl = properties.get(FLOW_SNAPSHOT_URL_KEY);
+        if (flowSnapshotUrl != null && !flowSnapshotUrl.trim().isEmpty()) {
+            final String useSslPropertyValue = 
properties.get(FLOW_SNAPSHOT_URL_USE_SSLCONTEXT_KEY);
+            final boolean useSsl = Boolean.parseBoolean(useSslPropertyValue);
 
             try {
-                return fetchFlowFromRegistry(registryUrl, bucketId, flowId, 
flowVersion, sslContext);
-            } catch (final NiFiRegistryException e) {
-                throw new StatelessConfigurationException("Could not fetch 
flow from Registry", e);
+                return fetchFlowFromUrl(flowSnapshotUrl, useSsl ? 
sslContextDefinition : null);
+            } catch (final Exception e) {
+                throw new StatelessConfigurationException("Could not fetch 
flow from URL", e);
             }
         }
 
-        final File flowSnapshotFile = new File(flowSnapshotFilename);
+        // Try downloading flow from registry
+        final String registryUrl = properties.get(REGISTRY_URL_KEY);
+        final String bucketId = properties.get(BUCKET_ID_KEY);
+        final String flowId = properties.get(FLOW_ID_KEY);
+        final String flowVersionValue = properties.get(FLOW_VERSION_KEY);
+        final Integer flowVersion;
         try {
-            return readVersionedFlowSnapshot(flowSnapshotFile);
-        } catch (final Exception e) {
-            throw new IOException("Specified configuration file " + 
propertiesFile + " indicates that the flow to run is located at " + 
flowSnapshotFilename
-                + " but failed to load dataflow from that location", e);
+            flowVersion = flowVersionValue == null || 
flowVersionValue.trim().isEmpty() ? null : Integer.parseInt(flowVersionValue);
+        } catch (final NumberFormatException nfe) {
+            throw new StatelessConfigurationException("The " + 
FLOW_VERSION_KEY + " property was expected to contain a number but had a value 
of " + flowVersionValue);
+        }
+
+        if (registryUrl == null || bucketId == null || flowId == null) {
+            throw new IllegalArgumentException("Configuration does not provide 
the filename of the flow to run, a URL to fetch it from, or the registryUrl, 
bucketId, and flowId.");
+        }
+
+        try {
+            final SSLContext sslContext = 
SslConfigurationUtil.createSslContext(sslContextDefinition);
+            return fetchFlowFromRegistry(registryUrl, bucketId, flowId, 
flowVersion, sslContext);
+        } catch (final NiFiRegistryException e) {
+            throw new StatelessConfigurationException("Could not fetch flow 
from Registry", e);
+        }
+    }
+
+    private VersionedFlowSnapshot fetchFlowFromUrl(final String url, final 
SslContextDefinition sslContextDefinition) throws IOException {
+        final OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder()
+            .callTimeout(30, TimeUnit.SECONDS);
+
+        if (sslContextDefinition != null) {
+            final TlsConfiguration tlsConfiguration = 
SslConfigurationUtil.createTlsConfiguration(sslContextDefinition);
+            OkHttpClientUtils.applyTlsToOkHttpClientBuilder(tlsConfiguration, 
clientBuilder);
+        }
+
+        final OkHttpClient client = clientBuilder.build();
+
+        final Request getRequest = new Request.Builder()
+            .url(url)
+            .get()
+            .build();
+
+        final Call call = client.newCall(getRequest);
+
+        try (final Response response = call.execute()) {
+            final ResponseBody responseBody = response.body();
+
+            if (!response.isSuccessful()) {
+                final String responseText = responseBody == null ? "<No 
Message Received from Server>" : responseBody.string();
+                throw new IOException("Failed to download flow from URL " + 
url + ": Response was " + response.code() + ": " + responseText);
+            }
+
+            if (responseBody == null) {
+                throw new IOException("Failed to download flow from URL " + 
url + ": Received successful response code " + response.code() + " but no 
Response body");
+            }
+
+            try {
+                final ObjectMapper objectMapper = new ObjectMapper();

Review comment:
       `ObjectMapper` is thread-safe and could be declared as an instance 
variable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to