This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fb05cd  NIFI-8430: Close Atlas client in order to free up resources
2fb05cd is described below

commit 2fb05cd8dc6b71d1808cc7447ce085a8e7ab8236
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Apr 14 21:23:14 2021 +0200

    NIFI-8430: Close Atlas client in order to free up resources
    
    This closes #5002
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../org/apache/nifi/atlas/NiFiAtlasClient.java     |  7 ++-
 .../nifi/atlas/reporting/ReportLineageToAtlas.java | 64 +++++++++++-----------
 2 files changed, 38 insertions(+), 33 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
index 3436ff3..e5c289a 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
@@ -74,7 +74,7 @@ import static 
org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
 import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
 import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
 
-public class NiFiAtlasClient {
+public class NiFiAtlasClient implements AutoCloseable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
 
@@ -84,6 +84,11 @@ public class NiFiAtlasClient {
         this.atlasClient = atlasClient;
     }
 
+    @Override
+    public void close() {
+        atlasClient.close();
+    }
+
     /**
      * This is an utility method to delete unused types.
      * Should be used during development or testing only.
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index 9afacf5..ca7ca36 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -814,46 +814,46 @@ public class ReportLineageToAtlas extends 
AbstractReportingTask {
         // If standalone or being primary node in a NiFi cluster, this node is 
responsible for doing primary tasks.
         final boolean isResponsibleForPrimaryTasks = !isClustered || 
getNodeTypeProvider().isPrimary();
 
-        final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context);
-
-        // Create Entity defs in Atlas if there's none yet.
-        if (!isTypeDefCreated) {
-            try {
-                if (isResponsibleForPrimaryTasks) {
-                    // Create NiFi type definitions in Atlas type system.
-                    atlasClient.registerNiFiTypeDefs(false);
-                } else {
-                    // Otherwise, just check existence of NiFi type 
definitions.
-                    if (!atlasClient.isNiFiTypeDefsRegistered()) {
-                        getLogger().debug("NiFi type definitions are not ready 
in Atlas type system yet.");
-                        return;
+        try (final NiFiAtlasClient atlasClient = 
createNiFiAtlasClient(context)) {
+
+            // Create Entity defs in Atlas if there's none yet.
+            if (!isTypeDefCreated) {
+                try {
+                    if (isResponsibleForPrimaryTasks) {
+                        // Create NiFi type definitions in Atlas type system.
+                        atlasClient.registerNiFiTypeDefs(false);
+                    } else {
+                        // Otherwise, just check existence of NiFi type 
definitions.
+                        if (!atlasClient.isNiFiTypeDefsRegistered()) {
+                            getLogger().debug("NiFi type definitions are not 
ready in Atlas type system yet.");
+                            return;
+                        }
                     }
+                    isTypeDefCreated = true;
+                } catch (AtlasServiceException e) {
+                    throw new RuntimeException("Failed to check and create 
NiFi flow type definitions in Atlas due to " + e, e);
                 }
-                isTypeDefCreated = true;
-            } catch (AtlasServiceException e) {
-                throw new RuntimeException("Failed to check and create NiFi 
flow type definitions in Atlas due to " + e, e);
             }
-        }
 
-        // Regardless of whether being a primary task node, each node has to 
analyse NiFiFlow.
-        // Assuming each node has the same flow definition, that is guaranteed 
by NiFi cluster management mechanism.
-        final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient);
+            // Regardless of whether being a primary task node, each node has 
to analyse NiFiFlow.
+            // Assuming each node has the same flow definition, that is 
guaranteed by NiFi cluster management mechanism.
+            final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient);
 
 
-        if (isResponsibleForPrimaryTasks) {
-            try {
-                atlasClient.registerNiFiFlow(nifiFlow);
-            } catch (AtlasServiceException e) {
-                throw new RuntimeException("Failed to register NiFI flow. " + 
e, e);
+            if (isResponsibleForPrimaryTasks) {
+                try {
+                    atlasClient.registerNiFiFlow(nifiFlow);
+                } catch (AtlasServiceException e) {
+                    throw new RuntimeException("Failed to register NiFI flow. 
" + e, e);
+                }
             }
-        }
-
-        // NOTE: There is a race condition between the primary node and other 
nodes.
-        // If a node notifies an event related to a NiFi component which is 
not yet created by NiFi primary node,
-        // then the notification message will fail due to having a reference 
to a non-existing entity.
-        nifiAtlasHook.setAtlasClient(atlasClient);
-        consumeNiFiProvenanceEvents(context, nifiFlow);
 
+            // NOTE: There is a race condition between the primary node and 
other nodes.
+            // If a node notifies an event related to a NiFi component which 
is not yet created by NiFi primary node,
+            // then the notification message will fail due to having a 
reference to a non-existing entity.
+            nifiAtlasHook.setAtlasClient(atlasClient);
+            consumeNiFiProvenanceEvents(context, nifiFlow);
+        }
     }
 
     private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient 
atlasClient) {

Reply via email to