This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new ba6d050 NIFI-6926: Fixed memory leak in NiFiAtlasHook NIFI-6926: Use
new instance of list instead of clearing it NIFI-6926: Logging the number of
messages to be sent to Atlas. NIFI-6926: Pass a copy of the messages list to
send() and clear the original list.
ba6d050 is described below
commit ba6d050ba814c2e10e9cdeff2d55f75b4faf2036
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Dec 4 15:54:07 2019 +0100
NIFI-6926: Fixed memory leak in NiFiAtlasHook
NIFI-6926: Use new instance of list instead of clearing it
NIFI-6926: Logging the number of messages to be sent to Atlas.
NIFI-6926: Pass a copy of the messages list to send() and clear the
original list.
This closes #3915
---
.../org/apache/nifi/atlas/hook/NiFiAtlasHook.java | 14 +++++-
.../apache/nifi/atlas/hook/NotificationSender.java | 2 +
.../apache/nifi/atlas/hook/TestNiFiAtlasHook.java | 58 ++++++++++++++++++++++
3 files changed, 72 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
index 4916337..4962a70 100644
---
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
+++
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
@@ -55,9 +55,11 @@ public class NiFiAtlasHook extends AtlasHook implements
LineageContext {
}
public void commitMessages() {
- final NotificationSender notificationSender = new NotificationSender();
+ final NotificationSender notificationSender =
createNotificationSender();
notificationSender.setAtlasClient(atlasClient);
- notificationSender.send(messages, this::notifyEntities);
+ List<HookNotificationMessage> messagesBatch = new
ArrayList<>(messages);
+ messages.clear();
+ notificationSender.send(messagesBatch, this::notifyEntities);
}
public void close() {
@@ -65,4 +67,12 @@ public class NiFiAtlasHook extends AtlasHook implements
LineageContext {
notificationInterface.close();
}
}
+
+ NotificationSender createNotificationSender() {
+ return new NotificationSender();
+ }
+
+ List<HookNotificationMessage> getMessages() {
+ return messages;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
index 4d599f1..c06e254 100644
---
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
+++
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
@@ -191,6 +191,8 @@ class NotificationSender {
* @param notifier responsible for sending notification messages, its
accept method can be called multiple times
*/
void send(final List<HookNotification.HookNotificationMessage> messages,
final Consumer<List<HookNotification.HookNotificationMessage>> notifier) {
+ logger.info("Sending {} messages to Atlas", messages.size());
+
final Metrics metrics = new Metrics();
try {
metrics.totalMessages = messages.size();
diff --git
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
new file mode 100644
index 0000000..98fd11e
--- /dev/null
+++
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.hook;
+
+import org.apache.atlas.notification.hook.HookNotification;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class TestNiFiAtlasHook {
+
+ private NiFiAtlasHook hook;
+
+ @Before
+ public void setUp() {
+ hook = new NiFiAtlasHook() {
+ @Override
+ NotificationSender createNotificationSender() {
+ return mock(NotificationSender.class);
+ }
+ };
+ }
+
+ @Test
+ public void messagesListShouldContainMessagesAfterAddMessage() {
+ hook.addMessage(new
HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_CREATE,
"nifi"));
+ hook.addMessage(new
HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE,
"nifi"));
+
+ assertEquals(2, hook.getMessages().size());
+ }
+
+ @Test
+ public void messagesListShouldBeCleanedUpAfterCommit() {
+ hook.addMessage(new
HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_CREATE,
"nifi"));
+ hook.addMessage(new
HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE,
"nifi"));
+
+ hook.commitMessages();
+
+ assertTrue(hook.getMessages().isEmpty());
+ }
+}