[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292625#comment-16292625 ]
ASF GitHub Bot commented on NIFI-3709: -------------------------------------- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157216102 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java --- @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.reporting; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.NiFIAtlasHook; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.StandardAnalysisContext; +import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage; +import org.apache.nifi.atlas.provenance.lineage.LineageStrategy; +import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.RegexClusterResolver; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.atlas.security.Basic; +import org.apache.nifi.atlas.security.Kerberos; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE; +import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION; + +@Tags({"atlas", "lineage"}) +@CapabilityDescription("Publishes NiFi flow data set level lineage to Apache Atlas." + + " By reporting flow information to Atlas, an end-to-end Process and DataSet lineage such as across NiFi environments and other systems" + + " connected by technologies, for example NiFi Site-to-Site, Kafka topic or Hive tables." + + " There are limitations and required configurations for both NiFi and Atlas. See 'Additional Details' for further description.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.") +@DynamicProperty(name = "hostnamePattern.<ClusterName>", value = "hostname Regex patterns", description = RegexClusterResolver.PATTERN_PROPERTY_PREFIX_DESC) +// In order for each reporting task instance to have its own static objects such as KafkaNotification. +@RequiresInstanceClassLoading +public class AtlasNiFiFlowLineage extends AbstractReportingTask { + + static final PropertyDescriptor ATLAS_URLS = new PropertyDescriptor.Builder() + .name("atlas-urls") + .displayName("Atlas URLs") + .description("Comma separated URL of Atlas Servers" + + " (e.g. http://atlas-server-hostname:21000 or https://atlas-server-hostname:21443)." + + " For accessing Atlas behind Knox gateway, specify Knox gateway URL" + + " (e.g. https://knox-hostname:8443/gateway/{topology-name}/atlas).") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final AllowableValue ATLAS_AUTHN_BASIC = new AllowableValue("basic", "Basic", "Use username and password."); + static final AllowableValue ATLAS_AUTHN_KERBEROS = new AllowableValue("kerberos", "Kerberos", "Use Kerberos keytab file."); + static final PropertyDescriptor ATLAS_AUTHN_METHOD = new PropertyDescriptor.Builder() + .name("atlas-authentication-method") + .displayName("Atlas Authentication Method") + .description("Specify how to authenticate this reporting task to Atlas server.") + .required(true) + .allowableValues(ATLAS_AUTHN_BASIC, ATLAS_AUTHN_KERBEROS) + .defaultValue(ATLAS_AUTHN_BASIC.getValue()) + .build(); + + public static final PropertyDescriptor ATLAS_USER = new PropertyDescriptor.Builder() + .name("atlas-username") + .displayName("Atlas Username") + .description("User name to communicate with Atlas.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor ATLAS_PASSWORD = new PropertyDescriptor.Builder() + .name("atlas-password") + .displayName("Atlas Password") + .description("Password to communicate with Atlas.") + .required(false) + .sensitive(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor ATLAS_CONF_DIR = new PropertyDescriptor.Builder() + .name("atlas-conf-dir") + .displayName("Atlas Configuration Directory") + .description("Directory path that contains 'atlas-application.properties' file." + + " If not specified and 'Create Atlas Configuration File' is disabled," + + " then, 'atlas-application.properties' file under root classpath is used.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor ATLAS_NIFI_URL = new PropertyDescriptor.Builder() + .name("atlas-nifi-url") + .displayName("NiFi URL for Atlas") + .description("NiFi URL is used in Atlas to represent this NiFi cluster (or standalone instance)." + + " It is recommended to use one that can be accessible remotely instead of using 'localhost'.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + public static final PropertyDescriptor ATLAS_DEFAULT_CLUSTER_NAME = new PropertyDescriptor.Builder() + .name("atlas-default-cluster-name") + .displayName("Atlas Default Cluster Name") + .description("Cluster name for Atlas entities reported by this ReportingTask." + + " If not specified, 'atlas.cluster.name' in Atlas Configuration File is used." + + " Cluster name mappings can be configured by user defined properties." + + " See additional detail for detail.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + static final PropertyDescriptor ATLAS_CONF_CREATE = new PropertyDescriptor.Builder() + .name("atlas-conf-create") + .displayName("Create Atlas Configuration File") + .description("If enabled, 'atlas-application.properties' file will be created in 'Atlas Configuration Directory'" + + " automatically when this processor starts." + --- End diff -- Thank you for finding it! > Export NiFi flow dataset lineage to Apache Atlas > ------------------------------------------------ > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Reporter: Koji Kawamura > Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)