http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
new file mode 100644
index 0000000..4c78ef7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFiAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
+
+@Tags({"atlas", "lineage"})
+@CapabilityDescription("Publishes NiFi flow data set level lineage to Apache 
Atlas." +
+        " By reporting flow information to Atlas, an end-to-end Process and 
DataSet lineage such as across NiFi environments and other systems" +
+        " connected by technologies, for example NiFi Site-to-Site, Kafka 
topic or Hive tables." +
+        " There are limitations and required configurations for both NiFi and 
Atlas. See 'Additional Details' for further description.")
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's 
last event Id so that on restart the task knows where it left off.")
+@DynamicProperty(name = "hostnamePattern.<ClusterName>", value = "hostname 
Regex patterns", description = 
RegexClusterResolver.PATTERN_PROPERTY_PREFIX_DESC)
+// In order for each reporting task instance to have its own static objects 
such as KafkaNotification.
+@RequiresInstanceClassLoading
+public class ReportLineageToAtlas extends AbstractReportingTask {
+
+    static final PropertyDescriptor ATLAS_URLS = new 
PropertyDescriptor.Builder()
+            .name("atlas-urls")
+            .displayName("Atlas URLs")
+            .description("Comma separated URL of Atlas Servers" +
+                    " (e.g. http://atlas-server-hostname:21000 or 
https://atlas-server-hostname:21443)." +
+                    " For accessing Atlas behind Knox gateway, specify Knox 
gateway URL" +
+                    " (e.g. 
https://knox-hostname:8443/gateway/{topology-name}/atlas).")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final AllowableValue ATLAS_AUTHN_BASIC = new 
AllowableValue("basic", "Basic", "Use username and password.");
+    static final AllowableValue ATLAS_AUTHN_KERBEROS = new 
AllowableValue("kerberos", "Kerberos", "Use Kerberos keytab file.");
+    static final PropertyDescriptor ATLAS_AUTHN_METHOD = new 
PropertyDescriptor.Builder()
+            .name("atlas-authentication-method")
+            .displayName("Atlas Authentication Method")
+            .description("Specify how to authenticate this reporting task to 
Atlas server.")
+            .required(true)
+            .allowableValues(ATLAS_AUTHN_BASIC, ATLAS_AUTHN_KERBEROS)
+            .defaultValue(ATLAS_AUTHN_BASIC.getValue())
+            .build();
+
+    public static final PropertyDescriptor ATLAS_USER = new 
PropertyDescriptor.Builder()
+            .name("atlas-username")
+            .displayName("Atlas Username")
+            .description("User name to communicate with Atlas.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ATLAS_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("atlas-password")
+            .displayName("Atlas Password")
+            .description("Password to communicate with Atlas.")
+            .required(false)
+            .sensitive(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor ATLAS_CONF_DIR = new 
PropertyDescriptor.Builder()
+            .name("atlas-conf-dir")
+            .displayName("Atlas Configuration Directory")
+            .description("Directory path that contains 
'atlas-application.properties' file." +
+                    " If not specified and 'Create Atlas Configuration File' 
is disabled," +
+                    " then, 'atlas-application.properties' file under root 
classpath is used.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ATLAS_NIFI_URL = new 
PropertyDescriptor.Builder()
+            .name("atlas-nifi-url")
+            .displayName("NiFi URL for Atlas")
+            .description("NiFi URL is used in Atlas to represent this NiFi 
cluster (or standalone instance)." +
+                    " It is recommended to use one that can be accessible 
remotely instead of using 'localhost'.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ATLAS_DEFAULT_CLUSTER_NAME = new 
PropertyDescriptor.Builder()
+            .name("atlas-default-cluster-name")
+            .displayName("Atlas Default Cluster Name")
+            .description("Cluster name for Atlas entities reported by this 
ReportingTask." +
+                    " If not specified, 'atlas.cluster.name' in Atlas 
Configuration File is used." +
+                    " Cluster name mappings can be configured by user defined 
properties." +
+                    " See additional detail for detail.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor ATLAS_CONF_CREATE = new 
PropertyDescriptor.Builder()
+            .name("atlas-conf-create")
+            .displayName("Create Atlas Configuration File")
+            .description("If enabled, 'atlas-application.properties' file will 
be created in 'Atlas Configuration Directory'" +
+                    " automatically when this Reporting Task starts." +
+                    " Note that the existing configuration file will be 
overwritten.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("ssl-context-service")
+            .displayName("SSL Context Service")
+            .description("Specifies the SSL Context Service to use for 
communicating with Atlas and Kafka.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    static final PropertyDescriptor KAFKA_BOOTSTRAP_SERVERS = new 
PropertyDescriptor.Builder()
+            .name("kafka-bootstrap-servers")
+            .displayName("Kafka Bootstrap Servers")
+            .description("Kafka Bootstrap Servers to send Atlas hook 
notification messages based on NiFi provenance events." +
+                    " E.g. 'localhost:9092'" +
+                    " NOTE: Once this reporting task has started, restarting 
NiFi is required to changed this property" +
+                    " as Atlas library holds a unmodifiable static reference 
to Kafka client.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final AllowableValue SEC_PLAINTEXT = new 
AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
+    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", 
"SSL");
+    static final AllowableValue SEC_SASL_PLAINTEXT = new 
AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
+    static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", 
"SASL_SSL", "SASL_SSL");
+    static final PropertyDescriptor KAFKA_SECURITY_PROTOCOL = new 
PropertyDescriptor.Builder()
+            .name("kafka-security-protocol")
+            .displayName("Kafka Security Protocol")
+            .description("Protocol used to communicate with Kafka brokers to 
send Atlas hook notification messages." +
+                    " Corresponds to Kafka's 'security.protocol' property.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, 
SEC_SASL_SSL)
+            .defaultValue(SEC_PLAINTEXT.getValue())
+            .build();
+
+    public static final PropertyDescriptor NIFI_KERBEROS_PRINCIPAL = new 
PropertyDescriptor.Builder()
+            .name("nifi-kerberos-principal")
+            .displayName("NiFi Kerberos Principal")
+            .description("The Kerberos principal for this NiFi instance to 
access Atlas API and Kafka brokers." +
+                    " If not set, it is expected to set a JAAS configuration 
file in the JVM properties defined in the bootstrap.conf file." +
+                    " This principal will be set into 'sasl.jaas.config' 
Kafka's property.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor NIFI_KERBEROS_KEYTAB = new 
PropertyDescriptor.Builder()
+            .name("nifi-kerberos-keytab")
+            .displayName("NiFi Kerberos Keytab")
+            .description("The Kerberos keytab for this NiFi instance to access 
Atlas API and Kafka brokers." +
+                    " If not set, it is expected to set a JAAS configuration 
file in the JVM properties defined in the bootstrap.conf file." +
+                    " This principal will be set into 'sasl.jaas.config' 
Kafka's property.")
+            .required(false)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    static final PropertyDescriptor KAFKA_KERBEROS_SERVICE_NAME = new 
PropertyDescriptor.Builder()
+            .name("kafka-kerberos-service-name-kafka")
+            .displayName("Kafka Kerberos Service Name")
+            .description("The Kerberos principal name that Kafka runs for 
Atlas notification." +
+                    " This can be defined either in Kafka's JAAS config or in 
Kafka's config." +
+                    " Corresponds to Kafka's 'security.protocol' property." +
+                    " It is ignored unless one of the SASL options of the 
<Security Protocol> are selected.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("kafka")
+            .build();
+
+    static final AllowableValue LINEAGE_STRATEGY_SIMPLE_PATH = new 
AllowableValue("SimplePath", "Simple Path",
+            "Map NiFi provenance events and target Atlas DataSets to 
statically created 'nifi_flow_path' Atlas Processes." +
+                    " See also 'Additional Details'.");
+    static final AllowableValue LINEAGE_STRATEGY_COMPLETE_PATH = new 
AllowableValue("CompletePath", "Complete Path",
+            "Create separate 'nifi_flow_path' Atlas Processes for each 
distinct input and output DataSet combinations" +
+                    " by looking at the complete route for a given FlowFile. 
See also 'Additional Details.");
+
+    static final PropertyDescriptor NIFI_LINEAGE_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("nifi-lineage-strategy")
+            .displayName("NiFi Lineage Strategy")
+            .description("Specifies granularity on how NiFi data flow should 
be reported to Atlas.")
+            .required(true)
+            .allowableValues(LINEAGE_STRATEGY_SIMPLE_PATH, 
LINEAGE_STRATEGY_COMPLETE_PATH)
+            .defaultValue(LINEAGE_STRATEGY_SIMPLE_PATH.getValue())
+            .build();
+
+    private static final String ATLAS_PROPERTIES_FILENAME = 
"atlas-application.properties";
+    private static final String ATLAS_PROPERTY_CLUSTER_NAME = 
"atlas.cluster.name";
+    private static final String ATLAS_PROPERTY_ENABLE_TLS = "atlas.enableTLS";
+    private static final String ATLAS_KAFKA_PREFIX = "atlas.kafka.";
+    private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = 
ATLAS_KAFKA_PREFIX + "bootstrap.servers";
+    private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = 
ATLAS_KAFKA_PREFIX + ProducerConfig.CLIENT_ID_CONFIG;
+    private final ServiceLoader<ClusterResolver> clusterResolverLoader = 
ServiceLoader.load(ClusterResolver.class);
+    private volatile NiFiAtlasClient atlasClient;
+    private volatile Properties atlasProperties;
+    private volatile boolean isTypeDefCreated = false;
+    private volatile String defaultClusterName;
+
+    private volatile ProvenanceEventConsumer consumer;
+    private volatile ClusterResolvers clusterResolvers;
+    private volatile NiFiAtlasHook nifiAtlasHook;
+    private volatile LineageStrategy lineageStrategy;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(ATLAS_URLS);
+        properties.add(ATLAS_AUTHN_METHOD);
+        properties.add(ATLAS_USER);
+        properties.add(ATLAS_PASSWORD);
+        properties.add(ATLAS_CONF_DIR);
+        properties.add(ATLAS_NIFI_URL);
+        properties.add(ATLAS_DEFAULT_CLUSTER_NAME);
+        properties.add(NIFI_LINEAGE_STRATEGY);
+        properties.add(PROVENANCE_START_POSITION);
+        properties.add(PROVENANCE_BATCH_SIZE);
+        properties.add(SSL_CONTEXT_SERVICE);
+
+        // Following properties are required if ATLAS_CONF_CREATE is enabled.
+        // Otherwise should be left blank.
+        properties.add(ATLAS_CONF_CREATE);
+        properties.add(NIFI_KERBEROS_PRINCIPAL);
+        properties.add(NIFI_KERBEROS_KEYTAB);
+        properties.add(KAFKA_KERBEROS_SERVICE_NAME);
+        properties.add(KAFKA_BOOTSTRAP_SERVERS);
+        properties.add(KAFKA_SECURITY_PROTOCOL);
+
+        return properties;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String 
propertyDescriptorName) {
+        for (ClusterResolver resolver : clusterResolverLoader) {
+            final PropertyDescriptor propertyDescriptor = 
resolver.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
+            if(propertyDescriptor != null) {
+                return propertyDescriptor;
+            }
+        }
+        return null;
+    }
+
+    private void parseAtlasUrls(final PropertyValue atlasUrlsProp, final 
Consumer<String> urlStrConsumer) {
+        final String atlasUrlsStr = 
atlasUrlsProp.evaluateAttributeExpressions().getValue();
+        if (atlasUrlsStr != null && !atlasUrlsStr.isEmpty()) {
+            Arrays.stream(atlasUrlsStr.split(","))
+                    .map(String::trim)
+                    .forEach(urlStrConsumer);
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
+        final Collection<ValidationResult> results = new ArrayList<>();
+
+        final boolean isSSLContextServiceSet = 
context.getProperty(SSL_CONTEXT_SERVICE).isSet();
+        final ValidationResult.Builder invalidSSLService = new 
ValidationResult.Builder()
+                .subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false);
+        parseAtlasUrls(context.getProperty(ATLAS_URLS), input -> {
+            final ValidationResult.Builder builder = new 
ValidationResult.Builder().subject(ATLAS_URLS.getDisplayName()).input(input);
+            try {
+                final URL url = new URL(input);
+                if ("https".equalsIgnoreCase(url.getProtocol()) && 
!isSSLContextServiceSet) {
+                    results.add(invalidSSLService.explanation("required by 
HTTPS Atlas access").build());
+                } else {
+                    results.add(builder.explanation("Valid 
URI").valid(true).build());
+                }
+            } catch (Exception e) {
+                results.add(builder.explanation("Contains invalid URI: " + 
e).valid(false).build());
+            }
+        });
+
+        final String atlasAuthNMethod = 
context.getProperty(ATLAS_AUTHN_METHOD).getValue();
+        final AtlasAuthN atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
+        results.addAll(atlasAuthN.validate(context));
+
+
+        clusterResolverLoader.forEach(resolver -> 
results.addAll(resolver.validate(context)));
+
+        if (context.getProperty(ATLAS_CONF_CREATE).asBoolean()) {
+
+            Stream.of(ATLAS_CONF_DIR, ATLAS_DEFAULT_CLUSTER_NAME, 
KAFKA_BOOTSTRAP_SERVERS)
+                    .filter(p -> !context.getProperty(p).isSet())
+                    .forEach(p -> results.add(new ValidationResult.Builder()
+                            .subject(p.getDisplayName())
+                            .explanation("required to create Atlas 
configuration file.")
+                            .valid(false).build()));
+
+            validateKafkaProperties(context, results, isSSLContextServiceSet, 
invalidSSLService);
+        }
+
+        return results;
+    }
+
+    private void validateKafkaProperties(ValidationContext context, 
Collection<ValidationResult> results, boolean isSSLContextServiceSet, 
ValidationResult.Builder invalidSSLService) {
+        final String kafkaSecurityProtocol = 
context.getProperty(KAFKA_SECURITY_PROTOCOL).getValue();
+        if ((SEC_SSL.equals(kafkaSecurityProtocol) || 
SEC_SASL_SSL.equals(kafkaSecurityProtocol))
+                && !isSSLContextServiceSet) {
+            results.add(invalidSSLService.explanation("required by SSL Kafka 
connection").build());
+        }
+
+        if (SEC_SASL_PLAINTEXT.equals(kafkaSecurityProtocol) || 
SEC_SASL_SSL.equals(kafkaSecurityProtocol)) {
+            Stream.of(NIFI_KERBEROS_PRINCIPAL, NIFI_KERBEROS_KEYTAB, 
KAFKA_KERBEROS_SERVICE_NAME)
+                    .filter(p -> !context.getProperty(p).isSet())
+                    .forEach(p -> results.add(new ValidationResult.Builder()
+                            .subject(p.getDisplayName())
+                            .explanation("required by Kafka SASL 
authentication.")
+                            .valid(false).build()));
+        }
+    }
+
+    @OnScheduled
+    public void setup(ConfigurationContext context) throws IOException {
+        // initAtlasClient has to be done first as it loads AtlasProperty.
+        initAtlasClient(context);
+        initLineageStrategy(context);
+        initClusterResolvers(context);
+    }
+
+    private void initLineageStrategy(ConfigurationContext context) throws 
IOException {
+        nifiAtlasHook = new NiFiAtlasHook(atlasClient);
+
+        final String strategy = 
context.getProperty(NIFI_LINEAGE_STRATEGY).getValue();
+        if (LINEAGE_STRATEGY_SIMPLE_PATH.equals(strategy)) {
+            lineageStrategy = new SimpleFlowPathLineage();
+        } else if (LINEAGE_STRATEGY_COMPLETE_PATH.equals(strategy)) {
+            lineageStrategy = new CompleteFlowPathLineage();
+        }
+
+        lineageStrategy.setLineageContext(nifiAtlasHook);
+        initProvenanceConsumer(context);
+    }
+
+    private void initClusterResolvers(ConfigurationContext context) {
+        final Set<ClusterResolver> loadedClusterResolvers = new 
LinkedHashSet<>();
+        clusterResolverLoader.forEach(resolver -> {
+            resolver.configure(context);
+            loadedClusterResolvers.add(resolver);
+        });
+        clusterResolvers = new 
ClusterResolvers(Collections.unmodifiableSet(loadedClusterResolvers), 
defaultClusterName);
+    }
+
+
+    private void initAtlasClient(ConfigurationContext context) throws 
IOException {
+        List<String> urls = new ArrayList<>();
+        parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
+        final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> 
url.toLowerCase().startsWith("https"));
+        final String atlasAuthNMethod = 
context.getProperty(ATLAS_AUTHN_METHOD).getValue();
+
+        final String confDirStr = 
context.getProperty(ATLAS_CONF_DIR).evaluateAttributeExpressions().getValue();
+        final File confDir = confDirStr != null && !confDirStr.isEmpty() ? new 
File(confDirStr) : null;
+
+        atlasProperties = new Properties();
+        final File atlasPropertiesFile = new File(confDir, 
ATLAS_PROPERTIES_FILENAME);
+
+        final Boolean createAtlasConf = 
context.getProperty(ATLAS_CONF_CREATE).asBoolean();
+        if (!createAtlasConf) {
+            // Load existing properties file.
+            if (atlasPropertiesFile.isFile()) {
+                getLogger().info("Loading {}", new 
Object[]{atlasPropertiesFile});
+                try (InputStream in = new 
FileInputStream(atlasPropertiesFile)) {
+                    atlasProperties.load(in);
+                }
+            } else {
+                final String fileInClasspath = "/" + ATLAS_PROPERTIES_FILENAME;
+                try (InputStream in = 
ReportLineageToAtlas.class.getResourceAsStream(fileInClasspath)) {
+                    getLogger().info("Loading {} from classpath", new 
Object[]{fileInClasspath});
+                    if (in == null) {
+                        throw new ProcessException(String.format("Could not 
find %s in classpath." +
+                                " Please add it to classpath," +
+                                " or specify %s a directory containing Atlas 
properties file," +
+                                " or enable %s to generate it.",
+                                fileInClasspath, 
ATLAS_CONF_DIR.getDisplayName(), ATLAS_CONF_CREATE.getDisplayName()));
+                    }
+                    atlasProperties.load(in);
+                }
+            }
+        }
+
+        // Resolve default cluster name.
+        defaultClusterName = 
context.getProperty(ATLAS_DEFAULT_CLUSTER_NAME).evaluateAttributeExpressions().getValue();
+        if (defaultClusterName == null || defaultClusterName.isEmpty()) {
+            // If default cluster name is not specified by processor 
configuration, then load it from Atlas config.
+            defaultClusterName = 
atlasProperties.getProperty(ATLAS_PROPERTY_CLUSTER_NAME);
+        }
+
+        // If default cluster name is still not defined, processor should not 
be able to start.
+        if (defaultClusterName == null || defaultClusterName.isEmpty()) {
+            throw new ProcessException("Default cluster name is not defined.");
+        }
+
+        final AtlasAuthN atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
+        atlasAuthN.configure(context);
+
+        // Create Atlas configuration file if necessary.
+        if (createAtlasConf) {
+
+            atlasProperties.put(ATLAS_PROPERTY_CLUSTER_NAME, 
defaultClusterName);
+            atlasProperties.put(ATLAS_PROPERTY_ENABLE_TLS, 
String.valueOf(isAtlasApiSecure));
+
+            setKafkaConfig(atlasProperties, context);
+
+            atlasAuthN.populateProperties(atlasProperties);
+
+            try (FileOutputStream fos = new 
FileOutputStream(atlasPropertiesFile)) {
+                String ts = 
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX")
+                        .withZone(ZoneOffset.UTC)
+                        .format(Instant.now());
+                atlasProperties.store(fos, "Generated by Apache NiFi 
ReportLineageToAtlas ReportingTask at " + ts);
+            }
+        }
+
+
+        atlasClient = NiFiAtlasClient.getInstance();
+        try {
+            atlasClient.initialize(urls.toArray(new String[]{}), atlasAuthN, 
confDir);
+        } catch (final NullPointerException e) {
+            throw new ProcessException(String.format("Failed to initialize 
Atlas client due to %s." +
+                    " Make sure 'atlas-application.properties' is in the 
directory specified with %s" +
+                    " or under root classpath if not specified.", e, 
ATLAS_CONF_DIR.getDisplayName()), e);
+        }
+
+    }
+
+    private AtlasAuthN getAtlasAuthN(String atlasAuthNMethod) {
+        final AtlasAuthN atlasAuthN;
+        switch (atlasAuthNMethod) {
+            case "basic" :
+                atlasAuthN = new Basic();
+                break;
+            case "kerberos" :
+                atlasAuthN = new Kerberos();
+                break;
+            default:
+                throw new IllegalArgumentException(atlasAuthNMethod + " is not 
supported as an Atlas authentication method.");
+        }
+        return atlasAuthN;
+    }
+
+    private void initProvenanceConsumer(final ConfigurationContext context) 
throws IOException {
+        consumer = new ProvenanceEventConsumer();
+        
consumer.setStartPositionValue(context.getProperty(PROVENANCE_START_POSITION).getValue());
+        
consumer.setBatchSize(context.getProperty(PROVENANCE_BATCH_SIZE).asInteger());
+        consumer.addTargetEventType(lineageStrategy.getTargetEventTypes());
+        consumer.setLogger(getLogger());
+        consumer.setScheduled(true);
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        if (consumer != null) {
+            consumer.setScheduled(false);
+        }
+        if (nifiAtlasHook != null) {
+            nifiAtlasHook.close();
+            nifiAtlasHook = null;
+        }
+    }
+
+    @Override
+    public void onTrigger(ReportingContext context) {
+
+        final String clusterNodeId = context.getClusterNodeIdentifier();
+        final boolean isClustered = context.isClustered();
+        if (isClustered && isEmpty(clusterNodeId)) {
+            // Clustered, but this node's ID is unknown. Not ready for 
processing yet.
+            return;
+        }
+
+        // If standalone or being primary node in a NiFi cluster, this node is 
responsible for doing primary tasks.
+        final boolean isResponsibleForPrimaryTasks = !isClustered || 
getNodeTypeProvider().isPrimary();
+
+        // Create Entity defs in Atlas if there's none yet.
+        if (!isTypeDefCreated) {
+            try {
+                if (isResponsibleForPrimaryTasks) {
+                    // Create NiFi type definitions in Atlas type system.
+                    atlasClient.registerNiFiTypeDefs(false);
+                } else {
+                    // Otherwise, just check existence of NiFi type 
definitions.
+                    if (!atlasClient.isNiFiTypeDefsRegistered()) {
+                        getLogger().debug("NiFi type definitions are not ready 
in Atlas type system yet.");
+                        return;
+                    }
+                }
+                isTypeDefCreated = true;
+            } catch (AtlasServiceException e) {
+                throw new RuntimeException("Failed to check and create NiFi 
flow type definitions in Atlas due to " + e, e);
+            }
+        }
+
+        // Regardless of whether being a primary task node, each node has to 
analyse NiFiFlow.
+        // Assuming each node has the same flow definition, that is guaranteed 
by NiFi cluster management mechanism.
+        final NiFiFlow nifiFlow = createNiFiFlow(context);
+
+
+        if (isResponsibleForPrimaryTasks) {
+            try {
+                atlasClient.registerNiFiFlow(nifiFlow);
+            } catch (AtlasServiceException e) {
+                throw new RuntimeException("Failed to register NiFI flow. " + 
e, e);
+            }
+        }
+
+        // NOTE: There is a race condition between the primary node and other 
nodes.
+        // If a node notifies an event related to a NiFi component which is 
not yet created by NiFi primary node,
+        // then the notification message will fail due to having a reference 
to a non-existing entity.
+        consumeNiFiProvenanceEvents(context, nifiFlow);
+
+    }
+
+    private NiFiFlow createNiFiFlow(ReportingContext context) {
+        final ProcessGroupStatus rootProcessGroup = 
context.getEventAccess().getGroupStatus("root");
+        final String flowName = rootProcessGroup.getName();
+        final String nifiUrl = 
context.getProperty(ATLAS_NIFI_URL).evaluateAttributeExpressions().getValue();
+
+
+        final String clusterName;
+        try {
+            final String nifiHostName = new URL(nifiUrl).getHost();
+            clusterName = clusterResolvers.fromHostNames(nifiHostName);
+        } catch (MalformedURLException e) {
+            throw new IllegalArgumentException("Failed to parse NiFi URL, " + 
e.getMessage(), e);
+        }
+
+        NiFiFlow existingNiFiFlow = null;
+        try {
+            // Retrieve Existing NiFiFlow from Atlas.
+            existingNiFiFlow = 
atlasClient.fetchNiFiFlow(rootProcessGroup.getId(), clusterName);
+        } catch (AtlasServiceException e) {
+            if (ClientResponse.Status.NOT_FOUND.equals(e.getStatus())){
+                getLogger().debug("Existing flow was not found for {}@{}", new 
Object[]{rootProcessGroup.getId(), clusterName});
+            } else {
+                throw new RuntimeException("Failed to fetch existing NiFI 
flow. " + e, e);
+            }
+        }
+
+        final NiFiFlow nifiFlow = existingNiFiFlow != null ? existingNiFiFlow 
: new NiFiFlow(rootProcessGroup.getId());
+        nifiFlow.setFlowName(flowName);
+        nifiFlow.setUrl(nifiUrl);
+        nifiFlow.setClusterName(clusterName);
+
+        final NiFiFlowAnalyzer flowAnalyzer = new NiFiFlowAnalyzer();
+
+        flowAnalyzer.analyzeProcessGroup(nifiFlow, rootProcessGroup);
+        flowAnalyzer.analyzePaths(nifiFlow);
+
+        return nifiFlow;
+    }
+
+    private void consumeNiFiProvenanceEvents(ReportingContext context, 
NiFiFlow nifiFlow) {
+        final EventAccess eventAccess = context.getEventAccess();
+        final AnalysisContext analysisContext = new 
StandardAnalysisContext(nifiFlow, clusterResolvers,
+                // FIXME: This class cast shouldn't be necessary to query 
lineage. Possible refactor target in next major update.
+                (ProvenanceRepository)eventAccess.getProvenanceRepository());
+        consumer.consumeEvents(eventAccess, context.getStateManager(), events 
-> {
+            for (ProvenanceEventRecord event : events) {
+                try {
+                    lineageStrategy.processEvent(analysisContext, nifiFlow, 
event);
+                } catch (Exception e) {
+                    // If something went wrong, log it and continue with other 
records.
+                    getLogger().error("Skipping failed analyzing event {} due 
to {}.", new Object[]{event, e, e});
+                }
+            }
+            nifiAtlasHook.commitMessages();
+        });
+    }
+
+    private void setKafkaConfig(Map<Object, Object> mapToPopulate, 
PropertyContext context) {
+
+        final String kafkaBootStrapServers = 
context.getProperty(KAFKA_BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        mapToPopulate.put(ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS, 
kafkaBootStrapServers);
+        mapToPopulate.put(ATLAS_PROPERTY_KAFKA_CLIENT_ID, 
String.format("%s.%s", getName(), getIdentifier()));
+
+        final String kafkaSecurityProtocol = 
context.getProperty(KAFKA_SECURITY_PROTOCOL).getValue();
+        mapToPopulate.put(ATLAS_KAFKA_PREFIX + "security.protocol", 
kafkaSecurityProtocol);
+
+        // Translate SSLContext Service configuration into Kafka properties
+        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null && 
sslContextService.isKeyStoreConfigured()) {
+            mapToPopulate.put(ATLAS_KAFKA_PREFIX + 
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
+            mapToPopulate.put(ATLAS_KAFKA_PREFIX + 
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
sslContextService.getKeyStorePassword());
+            final String keyPass = sslContextService.getKeyPassword() == null 
? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
+            mapToPopulate.put(ATLAS_KAFKA_PREFIX + 
SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPass);
+            mapToPopulate.put(ATLAS_KAFKA_PREFIX + 
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, sslContextService.getKeyStoreType());
+        }
+
+        if (sslContextService != null && 
sslContextService.isTrustStoreConfigured()) {
+            mapToPopulate.put(ATLAS_KAFKA_PREFIX + 
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
sslContextService.getTrustStoreFile());
+            mapToPopulate.put(ATLAS_KAFKA_PREFIX + 
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
sslContextService.getTrustStorePassword());
+            mapToPopulate.put(ATLAS_KAFKA_PREFIX + 
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
+        }
+
+        if (SEC_SASL_PLAINTEXT.equals(kafkaSecurityProtocol) || 
SEC_SASL_SSL.equals(kafkaSecurityProtocol)) {
+            setKafkaJaasConfig(mapToPopulate, context);
+        }
+
+    }
+
+    /**
+     * Populate Kafka JAAS properties for Atlas notification.
+     * Since Atlas 0.8.1 uses Kafka client 0.10.0.0, we can not use 
'sasl.jaas.config' property
+     * as it is available since 0.10.2, implemented by KAFKA-4259.
+     * Instead, this method uses old property names.
+     * @param mapToPopulate Map of configuration properties
+     * @param context Context
+     */
+    private void setKafkaJaasConfig(Map<Object, Object> mapToPopulate, 
PropertyContext context) {
+        String keytab = 
context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
+        String principal = 
context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        String serviceName = 
context.getProperty(KAFKA_KERBEROS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
+        if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) 
&& StringUtils.isNotBlank(serviceName)) {
+            mapToPopulate.put("atlas.jaas.KafkaClient.loginModuleControlFlag", 
"required");
+            mapToPopulate.put("atlas.jaas.KafkaClient.loginModuleName", 
"com.sun.security.auth.module.Krb5LoginModule");
+            mapToPopulate.put("atlas.jaas.KafkaClient.option.keyTab", keytab);
+            mapToPopulate.put("atlas.jaas.KafkaClient.option.principal", 
principal);
+            mapToPopulate.put("atlas.jaas.KafkaClient.option.serviceName", 
serviceName);
+            mapToPopulate.put("atlas.jaas.KafkaClient.option.storeKey", 
"True");
+            mapToPopulate.put("atlas.jaas.KafkaClient.option.useKeyTab", 
"True");
+            
mapToPopulate.put("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", 
"required");
+            
mapToPopulate.put("atlas.jaas.ticketBased-KafkaClient.loginModuleName", 
"com.sun.security.auth.module.Krb5LoginModule");
+            
mapToPopulate.put("atlas.jaas.ticketBased-KafkaClient.option.useTicketCache", 
"true");
+            mapToPopulate.put(ATLAS_KAFKA_PREFIX + 
"sasl.kerberos.service.name", serviceName);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/ClusterResolver.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/ClusterResolver.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/ClusterResolver.java
new file mode 100644
index 0000000..8693f0b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/ClusterResolver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.resolver;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+public interface ClusterResolver {
+
+    default Collection<ValidationResult> validate(final ValidationContext 
validationContext) {
+        return Collections.emptySet();
+    }
+
+    PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String 
propertyDescriptorName);
+
+    /**
+     * Implementation should clear previous configurations when this method is 
called again.
+     * @param context passed from ReportingTask
+     */
+    void configure(PropertyContext context);
+
+    /**
+     * Resolve a cluster name from a list of host names or an ip addresses.
+     * @param hostNames hostname or ip address
+     * @return resolved cluster name or null
+     */
+    default String fromHostNames(String ... hostNames) {
+        return null;
+    }
+
+    /**
+     * Resolve a cluster name from hints, such as Zookeeper Quorum, client 
port and znode path
+     * @param hints Contains variables to resolve a cluster name
+     * @return resolved cluster name or null
+     */
+    default String fromHints(Map<String, String> hints) {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/ClusterResolvers.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/ClusterResolvers.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/ClusterResolvers.java
new file mode 100644
index 0000000..7a117f6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/ClusterResolvers.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.resolver;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+public class ClusterResolvers implements ClusterResolver {
+    private final Set<ClusterResolver> resolvers;
+
+    private final String defaultClusterName;
+
+    public ClusterResolvers(Set<ClusterResolver> resolvers, String 
defaultClusterName) {
+        this.resolvers = resolvers;
+        this.defaultClusterName = defaultClusterName;
+    }
+
+    @Override
+    public PropertyDescriptor getSupportedDynamicPropertyDescriptor(String 
propertyDescriptorName) {
+        for (ClusterResolver resolver : resolvers) {
+            final PropertyDescriptor descriptor = 
resolver.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
+            if (descriptor != null) {
+                return descriptor;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Collection<ValidationResult> validate(ValidationContext 
validationContext) {
+        Collection<ValidationResult> results = new ArrayList<>();
+        for (ClusterResolver resolver : resolvers) {
+            results.addAll(resolver.validate(validationContext));
+        }
+        return results;
+    }
+
+    @Override
+    public void configure(PropertyContext context) {
+        for (ClusterResolver resolver : resolvers) {
+            resolver.configure(context);
+        }
+    }
+
+    @Override
+    public String fromHostNames(String ... hostNames) {
+        for (ClusterResolver resolver : resolvers) {
+            final String clusterName = resolver.fromHostNames(hostNames);
+            if (clusterName != null && !clusterName.isEmpty()) {
+                return clusterName;
+            }
+        }
+        return defaultClusterName;
+    }
+
+    @Override
+    public String fromHints(Map<String, String> hints) {
+        for (ClusterResolver resolver : resolvers) {
+            final String clusterName = resolver.fromHints(hints);
+            if (clusterName != null && !clusterName.isEmpty()) {
+                return clusterName;
+            }
+        }
+        return defaultClusterName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/RegexClusterResolver.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/RegexClusterResolver.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/RegexClusterResolver.java
new file mode 100644
index 0000000..8e0f5e0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/resolver/RegexClusterResolver.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.resolver;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class RegexClusterResolver implements ClusterResolver {
+
+    public static final String PATTERN_PROPERTY_PREFIX = "hostnamePattern.";
+    public static final String PATTERN_PROPERTY_PREFIX_DESC = "White space 
delimited (including new line) Regular Expressions" +
+            " to resolve a 'Cluster Name' from a hostname or IP address of a 
transit URI of NiFi provenance record.";
+    private Map<String, Set<Pattern>> clusterNamePatterns;
+
+    @Override
+    public PropertyDescriptor getSupportedDynamicPropertyDescriptor(String 
propertyDescriptorName) {
+        if (propertyDescriptorName.startsWith(PATTERN_PROPERTY_PREFIX)) {
+            return new PropertyDescriptor
+                    .Builder().name(propertyDescriptorName)
+                    .description(PATTERN_PROPERTY_PREFIX_DESC)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .expressionLanguageSupported(true)
+                    .dynamic(true)
+                    .sensitive(false)
+                    .build();
+        }
+        return null;
+    }
+
+    @Override
+    public Collection<ValidationResult> validate(ValidationContext 
validationContext) {
+        final List<ValidationResult> validationResults = new ArrayList<>();
+        consumeConfigurations(validationContext.getAllProperties(),
+                (clusterNamePatterns, patterns) -> {},
+                (entry, e) -> {
+                    final ValidationResult result = new 
ValidationResult.Builder()
+                            .subject(entry.getKey())
+                            .input(entry.getValue())
+                            .explanation(e.getMessage())
+                            .valid(false)
+                            .build();
+                    validationResults.add(result);
+                });
+        return validationResults;
+    }
+
+    @Override
+    public void configure(PropertyContext context) {
+
+        clusterNamePatterns = new HashMap<>();
+        consumeConfigurations(context.getAllProperties(),
+                (clusterName, patterns) -> 
clusterNamePatterns.put(clusterName, patterns),
+                null);
+
+    }
+
+    private void consumeConfigurations(final Map<String, String> allProperties,
+                                               final BiConsumer<String, 
Set<Pattern>> consumer,
+                                               final 
BiConsumer<Map.Entry<String, String>, RuntimeException> errorHandler) {
+        allProperties.entrySet().stream()
+                .filter(entry -> 
entry.getKey().startsWith(PATTERN_PROPERTY_PREFIX))
+                .forEach(entry -> {
+                    final String clusterName;
+                    final Set<Pattern> patterns;
+                    try {
+                        clusterName = 
entry.getKey().substring(PATTERN_PROPERTY_PREFIX.length());
+                        final String[] regexsArray = 
entry.getValue().split("\\s");
+                        final List<String> regexs = Arrays.stream(regexsArray)
+                                .map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toList());
+                        patterns = parseClusterNamePatterns(clusterName, 
regexs);
+                        consumer.accept(clusterName, patterns);
+                    } catch (RuntimeException e) {
+                        if (errorHandler != null) {
+                            errorHandler.accept(entry, e);
+                        } else {
+                            throw e;
+                        }
+                    }
+                });
+    }
+
+    private Set<Pattern> parseClusterNamePatterns(final String clusterName, 
List<String> regexs) {
+        if (clusterName == null || clusterName.isEmpty()) {
+            throw new IllegalArgumentException("Empty cluster name is not 
allowed.");
+        }
+
+        if (regexs.size() == 0) {
+            throw new IllegalArgumentException(
+                    String.format("At least one cluster name pattern is 
required, [%s].", clusterName));
+        }
+
+        return 
regexs.stream().map(Pattern::compile).collect(Collectors.toSet());
+    }
+
+    @Override
+    public String fromHostNames(String ... hostNames) {
+        for (Map.Entry<String, Set<Pattern>> entry : 
clusterNamePatterns.entrySet()) {
+            for (Pattern pattern : entry.getValue()) {
+                for (String hostname : hostNames) {
+                    if (pattern.matcher(hostname).matches()) {
+                        return entry.getKey();
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/AtlasAuthN.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/AtlasAuthN.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/AtlasAuthN.java
new file mode 100644
index 0000000..a0b036e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/AtlasAuthN.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.security;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+public interface AtlasAuthN {
+    AtlasClientV2 createClient(final String[] baseUrls);
+    Collection<ValidationResult> validate(final ValidationContext context);
+    void configure(final PropertyContext context);
+
+    /**
+     * Populate required Atlas application properties.
+     * This method is called when Atlas reporting task generates 
atlas-application.properties.
+     */
+    default void populateProperties(final Properties properties){};
+
+    default Optional<ValidationResult> validateRequiredField(ValidationContext 
context, PropertyDescriptor prop) {
+        if (!context.getProperty(prop).isSet()) {
+            return Optional.of(new ValidationResult.Builder()
+                    .subject(prop.getDisplayName())
+                    .valid(false)
+                    .explanation(String.format("required by '%s' auth.", 
this.getClass().getSimpleName()))
+                    .build());
+        }
+        return Optional.empty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Basic.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Basic.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Basic.java
new file mode 100644
index 0000000..b503c6a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Basic.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.security;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_PASSWORD;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
+
+public class Basic implements AtlasAuthN {
+
+    private String user;
+    private String password;
+
+    @Override
+    public Collection<ValidationResult> validate(ValidationContext context) {
+        return Stream.of(
+                validateRequiredField(context, ATLAS_USER),
+                validateRequiredField(context, ATLAS_PASSWORD)
+        
).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
+    }
+
+    @Override
+    public void configure(PropertyContext context) {
+        user = 
context.getProperty(ATLAS_USER).evaluateAttributeExpressions().getValue();
+        password = 
context.getProperty(ATLAS_PASSWORD).evaluateAttributeExpressions().getValue();
+
+        if (StringUtils.isEmpty(user)) {
+            throw new IllegalArgumentException("User is required for basic 
auth.");
+        }
+
+        if (StringUtils.isEmpty(password)){
+            throw new IllegalArgumentException("Password is required for basic 
auth.");
+        }
+    }
+
+    @Override
+    public AtlasClientV2 createClient(String[] baseUrls) {
+        return new AtlasClientV2(baseUrls, new String[]{user, password});
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
new file mode 100644
index 0000000..88feba0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.security;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB;
+import static 
org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL;
+
+public class Kerberos implements AtlasAuthN {
+
+    private String principal;
+    private String keytab;
+
+    @Override
+    public Collection<ValidationResult> validate(ValidationContext context) {
+        return Stream.of(
+                validateRequiredField(context, NIFI_KERBEROS_PRINCIPAL),
+                validateRequiredField(context, NIFI_KERBEROS_KEYTAB)
+        
).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
+    }
+
+    @Override
+    public void populateProperties(Properties properties) {
+        properties.put("atlas.authentication.method.kerberos", "true");
+    }
+
+    @Override
+    public void configure(PropertyContext context) {
+        principal = 
context.getProperty(NIFI_KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        keytab = 
context.getProperty(NIFI_KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
+
+        if (StringUtils.isEmpty(principal)) {
+            throw new IllegalArgumentException("Principal is required for 
Kerberos auth.");
+        }
+
+        if (StringUtils.isEmpty(keytab)){
+            throw new IllegalArgumentException("Keytab is required for 
Kerberos auth.");
+        }
+    }
+
+    @Override
+    public AtlasClientV2 createClient(String[] baseUrls) {
+        final Configuration hadoopConf = new Configuration();
+        hadoopConf.set("hadoop.security.authentication", "kerberos");
+        UserGroupInformation.setConfiguration(hadoopConf);
+        final UserGroupInformation ugi;
+        try {
+            ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to login with Kerberos due to: 
" + e, e);
+        }
+        return new AtlasClientV2(ugi, null, baseUrls);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer
new file mode 100644
index 0000000..014ec9e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# By component type:
+org.apache.nifi.atlas.provenance.analyzer.NiFiRemotePort
+org.apache.nifi.atlas.provenance.analyzer.NiFiRootGroupPort
+org.apache.nifi.atlas.provenance.analyzer.KafkaTopic
+org.apache.nifi.atlas.provenance.analyzer.PutHiveStreaming
+
+# By transit URI:
+org.apache.nifi.atlas.provenance.analyzer.Hive2JDBC
+org.apache.nifi.atlas.provenance.analyzer.HDFSPath
+org.apache.nifi.atlas.provenance.analyzer.HBaseTable
+org.apache.nifi.atlas.provenance.analyzer.FilePath
+
+# By event type, if none of above analyzers matches
+org.apache.nifi.atlas.provenance.analyzer.unknown.Create
+org.apache.nifi.atlas.provenance.analyzer.unknown.Receive
+org.apache.nifi.atlas.provenance.analyzer.unknown.Fetch
+org.apache.nifi.atlas.provenance.analyzer.unknown.Send
+org.apache.nifi.atlas.provenance.analyzer.unknown.RemoteInvocation

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.resolver.ClusterResolver
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.resolver.ClusterResolver
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.resolver.ClusterResolver
new file mode 100644
index 0000000..c649315
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.atlas.resolver.ClusterResolver
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.atlas.resolver.RegexClusterResolver
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc73c609/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
new file mode 100644
index 0000000..482cc1b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.atlas.reporting.ReportLineageToAtlas
\ No newline at end of file

Reply via email to