This is an automated email from the ASF dual-hosted git repository.

mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new ba6d050  NIFI-6926: Fixed memory leak in NiFiAtlasHook NIFI-6926: Use 
new instance of list instead of clearing it NIFI-6926: Logging the number of 
messages to be sent to Atlas. NIFI-6926: Pass a copy of the messages list to 
send() and clear the original list.
ba6d050 is described below

commit ba6d050ba814c2e10e9cdeff2d55f75b4faf2036
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Dec 4 15:54:07 2019 +0100

    NIFI-6926: Fixed memory leak in NiFiAtlasHook
    NIFI-6926: Use new instance of list instead of clearing it
    NIFI-6926: Logging the number of messages to be sent to Atlas.
    NIFI-6926: Pass a copy of the messages list to send() and clear the 
original list.
    
    This closes #3915
---
 .../org/apache/nifi/atlas/hook/NiFiAtlasHook.java  | 14 +++++-
 .../apache/nifi/atlas/hook/NotificationSender.java |  2 +
 .../apache/nifi/atlas/hook/TestNiFiAtlasHook.java  | 58 ++++++++++++++++++++++
 3 files changed, 72 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
index 4916337..4962a70 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
@@ -55,9 +55,11 @@ public class NiFiAtlasHook extends AtlasHook implements 
LineageContext {
     }
 
     public void commitMessages() {
-        final NotificationSender notificationSender = new NotificationSender();
+        final NotificationSender notificationSender = 
createNotificationSender();
         notificationSender.setAtlasClient(atlasClient);
-        notificationSender.send(messages, this::notifyEntities);
+        List<HookNotificationMessage> messagesBatch = new 
ArrayList<>(messages);
+        messages.clear();
+        notificationSender.send(messagesBatch, this::notifyEntities);
     }
 
     public void close() {
@@ -65,4 +67,12 @@ public class NiFiAtlasHook extends AtlasHook implements 
LineageContext {
             notificationInterface.close();
         }
     }
+
+    NotificationSender createNotificationSender() {
+        return new NotificationSender();
+    }
+
+    List<HookNotificationMessage> getMessages() {
+        return messages;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
index 4d599f1..c06e254 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
@@ -191,6 +191,8 @@ class NotificationSender {
      * @param notifier responsible for sending notification messages, its 
accept method can be called multiple times
      */
     void send(final List<HookNotification.HookNotificationMessage> messages, 
final Consumer<List<HookNotification.HookNotificationMessage>> notifier) {
+        logger.info("Sending {} messages to Atlas", messages.size());
+
         final Metrics metrics = new Metrics();
         try {
             metrics.totalMessages = messages.size();
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
new file mode 100644
index 0000000..98fd11e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.hook;
+
+import org.apache.atlas.notification.hook.HookNotification;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class TestNiFiAtlasHook {
+
+    private NiFiAtlasHook hook;
+
+    @Before
+    public void setUp() {
+        hook = new NiFiAtlasHook() {
+            @Override
+            NotificationSender createNotificationSender() {
+                return mock(NotificationSender.class);
+            }
+        };
+    }
+
+    @Test
+    public void messagesListShouldContainMessagesAfterAddMessage() {
+        hook.addMessage(new 
HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_CREATE,
 "nifi"));
+        hook.addMessage(new 
HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE,
 "nifi"));
+
+        assertEquals(2, hook.getMessages().size());
+    }
+
+    @Test
+    public void messagesListShouldBeCleanedUpAfterCommit() {
+        hook.addMessage(new 
HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_CREATE,
 "nifi"));
+        hook.addMessage(new 
HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE,
 "nifi"));
+
+        hook.commitMessages();
+
+        assertTrue(hook.getMessages().isEmpty());
+    }
+}

Reply via email to