Repository: nifi
Updated Branches:
  refs/heads/master e5ed62a98 -> 4b8c80ccc


NIFI-4741: Avoid DelegationToken expiration at ReportLineageToAtlas. This 
closes #2377

The reporting task used to hold a single AtlasClientV2 instance
throughout its runtime starting from being started until being stopped.
If it is configured to use Kerberos authentication for Atlas REST API, after
a published DelegationToken expires (10 hours by default), the reporting
task will not be able to recover from 401 Unauthorized state.

In order to avoid stucking in such situation, this commit changes the
way ReportLineageToAtlas uses AtlasClientV2 instance to create an
instance per onTrigger execution. It also addresses Kerberos ticket
expiration.

This approach incurs some overheads by initiating the client each time,
however, it should be insignificant from an overall processing time
perspective including analyzing NiFi flow and Provenance records.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4b8c80cc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4b8c80cc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4b8c80cc

Branch: refs/heads/master
Commit: 4b8c80cccc3207dc9fe6895266af95cca7c72250
Parents: e5ed62a
Author: Koji Kawamura <[email protected]>
Authored: Fri Jan 5 19:23:28 2018 +0900
Committer: Matt Gilman <[email protected]>
Committed: Fri Jan 5 11:17:33 2018 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/atlas/NiFiAtlasClient.java  | 44 ++------------------
 .../org/apache/nifi/atlas/NiFiAtlasHook.java    | 10 +++--
 .../atlas/reporting/ReportLineageToAtlas.java   | 40 +++++++++++++-----
 .../apache/nifi/atlas/security/Kerberos.java    |  3 +-
 .../apache/nifi/atlas/ITNiFiAtlasClient.java    |  4 +-
 .../atlas/reporting/ITReportLineageToAtlas.java |  1 +
 6 files changed, 44 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
index feb2b48..4e95a92 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
@@ -18,7 +18,6 @@ package org.apache.nifi.atlas;
 
 import com.sun.jersey.api.client.UniformInterfaceException;
 import com.sun.jersey.core.util.MultivaluedMapImpl;
-import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClientV2;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasServiceException;
@@ -29,14 +28,12 @@ import 
org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.nifi.atlas.security.AtlasAuthN;
 import org.apache.nifi.util.StringUtils;
 import org.apache.nifi.util.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.core.MultivaluedMap;
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,7 +41,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
@@ -82,44 +78,10 @@ public class NiFiAtlasClient {
 
     private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
 
-    private static NiFiAtlasClient nifiClient;
-    private AtlasClientV2 atlasClient;
+    private final AtlasClientV2 atlasClient;
 
-    private NiFiAtlasClient() {
-        super();
-    }
-
-    public static NiFiAtlasClient getInstance() {
-        if (nifiClient == null) {
-            synchronized (NiFiAtlasClient.class) {
-                if (nifiClient == null) {
-                    nifiClient = new NiFiAtlasClient();
-                }
-            }
-        }
-        return nifiClient;
-    }
-
-    public void initialize(final String[] baseUrls, final AtlasAuthN authN, 
final File atlasConfDir) {
-
-        synchronized (NiFiAtlasClient.class) {
-
-            if (atlasClient != null) {
-                logger.info("{} had been setup but replacing it with new 
one.", atlasClient);
-                ApplicationProperties.forceReload();
-            }
-
-            if (atlasConfDir != null) {
-                // If atlasConfDir is not set, atlas-application.properties 
will be searched under classpath.
-                Properties props = System.getProperties();
-                final String atlasConfProp = "atlas.conf";
-                props.setProperty(atlasConfProp, 
atlasConfDir.getAbsolutePath());
-                logger.debug("{} has been set to: {}", atlasConfProp, 
props.getProperty(atlasConfProp));
-            }
-
-            atlasClient = authN.createClient(baseUrls);
-
-        }
+    public NiFiAtlasClient(AtlasClientV2 atlasClient) {
+        this.atlasClient = atlasClient;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
index 43fefff..a15c935 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
@@ -60,7 +60,7 @@ public class NiFiAtlasHook extends AtlasHook implements 
LineageContext {
     private static final String CONF_PREFIX = "atlas.hook.nifi.";
     private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
 
-    private final NiFiAtlasClient atlasClient;
+    private NiFiAtlasClient atlasClient;
 
     /**
      * An index to resolve a qualifiedName from a GUID.
@@ -81,9 +81,7 @@ public class NiFiAtlasHook extends AtlasHook implements 
LineageContext {
         };
     }
 
-    public NiFiAtlasHook(NiFiAtlasClient atlasClient) {
-        this.atlasClient = atlasClient;
-
+    public NiFiAtlasHook() {
         final int qualifiedNameCacheSize = 10_000;
         this.guidToQualifiedName = createCache(qualifiedNameCacheSize);
 
@@ -91,6 +89,10 @@ public class NiFiAtlasHook extends AtlasHook implements 
LineageContext {
         this.typedQualifiedNameToRef = createCache(dataSetRefCacheSize);
     }
 
+    public void setAtlasClient(NiFiAtlasClient atlasClient) {
+        this.atlasClient = atlasClient;
+    }
+
     @Override
     protected String getNumberOfRetriesPropertyKey() {
         return HOOK_NUM_RETRIES;

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index 5bb6024..9238f95 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.atlas.reporting;
 
 import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -279,7 +280,7 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
     private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = 
ATLAS_KAFKA_PREFIX + "bootstrap.servers";
     private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = 
ATLAS_KAFKA_PREFIX + ProducerConfig.CLIENT_ID_CONFIG;
     private final ServiceLoader<ClusterResolver> clusterResolverLoader = 
ServiceLoader.load(ClusterResolver.class);
-    private volatile NiFiAtlasClient atlasClient;
+    private volatile AtlasAuthN atlasAuthN;
     private volatile Properties atlasProperties;
     private volatile boolean isTypeDefCreated = false;
     private volatile String defaultClusterName;
@@ -399,13 +400,13 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
     @OnScheduled
     public void setup(ConfigurationContext context) throws IOException {
         // initAtlasClient has to be done first as it loads AtlasProperty.
-        initAtlasClient(context);
+        initAtlasProperties(context);
         initLineageStrategy(context);
         initClusterResolvers(context);
     }
 
     private void initLineageStrategy(ConfigurationContext context) throws 
IOException {
-        nifiAtlasHook = new NiFiAtlasHook(atlasClient);
+        nifiAtlasHook = new NiFiAtlasHook();
 
         final String strategy = 
context.getProperty(NIFI_LINEAGE_STRATEGY).getValue();
         if (LINEAGE_STRATEGY_SIMPLE_PATH.equals(strategy)) {
@@ -428,7 +429,7 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
     }
 
 
-    private void initAtlasClient(ConfigurationContext context) throws 
IOException {
+    private void initAtlasProperties(ConfigurationContext context) throws 
IOException {
         List<String> urls = new ArrayList<>();
         parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
         final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> 
url.toLowerCase().startsWith("https"));
@@ -476,7 +477,7 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
             throw new ProcessException("Default cluster name is not defined.");
         }
 
-        final AtlasAuthN atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
+        atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
         atlasAuthN.configure(context);
 
         // Create Atlas configuration file if necessary.
@@ -497,16 +498,32 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
             }
         }
 
+        getLogger().debug("Force reloading Atlas application properties.");
+        ApplicationProperties.forceReload();
 
-        atlasClient = NiFiAtlasClient.getInstance();
+        if (confDir != null) {
+            // If atlasConfDir is not set, atlas-application.properties will 
be searched under classpath.
+            Properties props = System.getProperties();
+            final String atlasConfProp = "atlas.conf";
+            props.setProperty(atlasConfProp, confDir.getAbsolutePath());
+            getLogger().debug("{} has been set to: {}", new 
Object[]{atlasConfProp, props.getProperty(atlasConfProp)});
+        }
+    }
+
+    /**
+     * In order to avoid authentication expiration issues (i.e. Kerberos 
ticket and DelegationToken expiration),
+     * create Atlas client instance at every onTrigger execution.
+     */
+    private NiFiAtlasClient createNiFiAtlasClient(ReportingContext context) {
+        List<String> urls = new ArrayList<>();
+        parseAtlasUrls(context.getProperty(ATLAS_URLS), urls::add);
         try {
-            atlasClient.initialize(urls.toArray(new String[]{}), atlasAuthN, 
confDir);
+            return new 
NiFiAtlasClient(atlasAuthN.createClient(urls.toArray(new String[]{})));
         } catch (final NullPointerException e) {
             throw new ProcessException(String.format("Failed to initialize 
Atlas client due to %s." +
                     " Make sure 'atlas-application.properties' is in the 
directory specified with %s" +
                     " or under root classpath if not specified.", e, 
ATLAS_CONF_DIR.getDisplayName()), e);
         }
-
     }
 
     private AtlasAuthN getAtlasAuthN(String atlasAuthNMethod) {
@@ -557,6 +574,8 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
         // If standalone or being primary node in a NiFi cluster, this node is 
responsible for doing primary tasks.
         final boolean isResponsibleForPrimaryTasks = !isClustered || 
getNodeTypeProvider().isPrimary();
 
+        final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context);
+
         // Create Entity defs in Atlas if there's none yet.
         if (!isTypeDefCreated) {
             try {
@@ -578,7 +597,7 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
 
         // Regardless of whether being a primary task node, each node has to 
analyse NiFiFlow.
         // Assuming each node has the same flow definition, that is guaranteed 
by NiFi cluster management mechanism.
-        final NiFiFlow nifiFlow = createNiFiFlow(context);
+        final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient);
 
 
         if (isResponsibleForPrimaryTasks) {
@@ -592,11 +611,12 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
         // NOTE: There is a race condition between the primary node and other 
nodes.
         // If a node notifies an event related to a NiFi component which is 
not yet created by NiFi primary node,
         // then the notification message will fail due to having a reference 
to a non-existing entity.
+        nifiAtlasHook.setAtlasClient(atlasClient);
         consumeNiFiProvenanceEvents(context, nifiFlow);
 
     }
 
-    private NiFiFlow createNiFiFlow(ReportingContext context) {
+    private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient 
atlasClient) {
         final ProcessGroupStatus rootProcessGroup = 
context.getEventAccess().getGroupStatus("root");
         final String flowName = rootProcessGroup.getName();
         final String nifiUrl = 
context.getProperty(ATLAS_NIFI_URL).evaluateAttributeExpressions().getValue();

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
index 88feba0..ab55b49 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
@@ -73,7 +73,8 @@ public class Kerberos implements AtlasAuthN {
         UserGroupInformation.setConfiguration(hadoopConf);
         final UserGroupInformation ugi;
         try {
-            ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
+            UserGroupInformation.loginUserFromKeytab(principal, keytab);
+            ugi = UserGroupInformation.getCurrentUser();
         } catch (IOException e) {
             throw new RuntimeException("Failed to login with Kerberos due to: 
" + e, e);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java
index f1727b0..69a3042 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/ITNiFiAtlasClient.java
@@ -39,14 +39,14 @@ public class ITNiFiAtlasClient {
 
     @Before
     public void setup() {
-        atlasClient = NiFiAtlasClient.getInstance();
         // Add your atlas server ip address into /etc/hosts as 
atlas.example.com
         PropertyContext propertyContext = mock(PropertyContext.class);
         
when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_USER)).thenReturn(new
 MockPropertyValue("admin"));
         
when(propertyContext.getProperty(ReportLineageToAtlas.ATLAS_PASSWORD)).thenReturn(new
 MockPropertyValue("admin"));
         final AtlasAuthN atlasAuthN = new Basic();
         atlasAuthN.configure(propertyContext);
-        atlasClient.initialize(new 
String[]{"http://atlas.example.com:21000/"}, atlasAuthN, null);
+
+        atlasClient = new NiFiAtlasClient(atlasAuthN.createClient(new 
String[]{"http://atlas.example.com:21000/"}));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/4b8c80cc/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
index 2fe7d07..e83495a 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
@@ -409,6 +409,7 @@ public class ITReportLineageToAtlas {
         
when(eventAccess.getGroupStatus(eq("root"))).thenReturn(tc.rootPgStatus);
 
         final ProvenanceRepository provenanceRepository = 
mock(ProvenanceRepository.class);
+        when(eventAccess.getControllerStatus()).thenReturn(tc.rootPgStatus);
         
when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
         when(eventAccess.getProvenanceEvents(eq(-1L), 
anyInt())).thenReturn(tc.provenanceRecords);
         when(provenanceRepository.getMaxEventId()).thenReturn((long) 
tc.provenanceRecords.size() - 1);

Reply via email to