This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2fb05cd NIFI-8430: Close Atlas client in order to free up resources
2fb05cd is described below
commit 2fb05cd8dc6b71d1808cc7447ce085a8e7ab8236
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Apr 14 21:23:14 2021 +0200
NIFI-8430: Close Atlas client in order to free up resources
This closes #5002
Signed-off-by: David Handermann <[email protected]>
---
.../org/apache/nifi/atlas/NiFiAtlasClient.java | 7 ++-
.../nifi/atlas/reporting/ReportLineageToAtlas.java | 64 +++++++++++-----------
2 files changed, 38 insertions(+), 33 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
index 3436ff3..e5c289a 100644
---
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
+++
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
@@ -74,7 +74,7 @@ import static
org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
-public class NiFiAtlasClient {
+public class NiFiAtlasClient implements AutoCloseable {
private static final Logger logger =
LoggerFactory.getLogger(NiFiAtlasClient.class);
@@ -84,6 +84,11 @@ public class NiFiAtlasClient {
this.atlasClient = atlasClient;
}
+ @Override
+ public void close() {
+ atlasClient.close();
+ }
+
/**
* This is an utility method to delete unused types.
* Should be used during development or testing only.
diff --git
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index 9afacf5..ca7ca36 100644
---
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -814,46 +814,46 @@ public class ReportLineageToAtlas extends
AbstractReportingTask {
// If standalone or being primary node in a NiFi cluster, this node is
responsible for doing primary tasks.
final boolean isResponsibleForPrimaryTasks = !isClustered ||
getNodeTypeProvider().isPrimary();
- final NiFiAtlasClient atlasClient = createNiFiAtlasClient(context);
-
- // Create Entity defs in Atlas if there's none yet.
- if (!isTypeDefCreated) {
- try {
- if (isResponsibleForPrimaryTasks) {
- // Create NiFi type definitions in Atlas type system.
- atlasClient.registerNiFiTypeDefs(false);
- } else {
- // Otherwise, just check existence of NiFi type
definitions.
- if (!atlasClient.isNiFiTypeDefsRegistered()) {
- getLogger().debug("NiFi type definitions are not ready
in Atlas type system yet.");
- return;
+ try (final NiFiAtlasClient atlasClient =
createNiFiAtlasClient(context)) {
+
+ // Create Entity defs in Atlas if there's none yet.
+ if (!isTypeDefCreated) {
+ try {
+ if (isResponsibleForPrimaryTasks) {
+ // Create NiFi type definitions in Atlas type system.
+ atlasClient.registerNiFiTypeDefs(false);
+ } else {
+ // Otherwise, just check existence of NiFi type
definitions.
+ if (!atlasClient.isNiFiTypeDefsRegistered()) {
+ getLogger().debug("NiFi type definitions are not
ready in Atlas type system yet.");
+ return;
+ }
}
+ isTypeDefCreated = true;
+ } catch (AtlasServiceException e) {
+ throw new RuntimeException("Failed to check and create
NiFi flow type definitions in Atlas due to " + e, e);
}
- isTypeDefCreated = true;
- } catch (AtlasServiceException e) {
- throw new RuntimeException("Failed to check and create NiFi
flow type definitions in Atlas due to " + e, e);
}
- }
- // Regardless of whether being a primary task node, each node has to
analyse NiFiFlow.
- // Assuming each node has the same flow definition, that is guaranteed
by NiFi cluster management mechanism.
- final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient);
+ // Regardless of whether being a primary task node, each node has
to analyse NiFiFlow.
+ // Assuming each node has the same flow definition, that is
guaranteed by NiFi cluster management mechanism.
+ final NiFiFlow nifiFlow = createNiFiFlow(context, atlasClient);
- if (isResponsibleForPrimaryTasks) {
- try {
- atlasClient.registerNiFiFlow(nifiFlow);
- } catch (AtlasServiceException e) {
- throw new RuntimeException("Failed to register NiFI flow. " +
e, e);
+ if (isResponsibleForPrimaryTasks) {
+ try {
+ atlasClient.registerNiFiFlow(nifiFlow);
+ } catch (AtlasServiceException e) {
+ throw new RuntimeException("Failed to register NiFI flow.
" + e, e);
+ }
}
- }
-
- // NOTE: There is a race condition between the primary node and other
nodes.
- // If a node notifies an event related to a NiFi component which is
not yet created by NiFi primary node,
- // then the notification message will fail due to having a reference
to a non-existing entity.
- nifiAtlasHook.setAtlasClient(atlasClient);
- consumeNiFiProvenanceEvents(context, nifiFlow);
+ // NOTE: There is a race condition between the primary node and
other nodes.
+ // If a node notifies an event related to a NiFi component which
is not yet created by NiFi primary node,
+ // then the notification message will fail due to having a
reference to a non-existing entity.
+ nifiAtlasHook.setAtlasClient(atlasClient);
+ consumeNiFiProvenanceEvents(context, nifiFlow);
+ }
}
private NiFiFlow createNiFiFlow(ReportingContext context, NiFiAtlasClient
atlasClient) {