Repository: hive
Updated Branches:
  refs/heads/master 3e9814377 -> 75c6ee417


HIVE-20981: streaming/AbstractRecordWriter leaks HeapMemoryMonitor (Eric 
Wohlstadter, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/75c6ee41
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/75c6ee41
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/75c6ee41

Branch: refs/heads/master
Commit: 75c6ee417faca1f5d938f3928fde742738e5d62a
Parents: 3e98143
Author: Eric Wohlstadter <[email protected]>
Authored: Thu Nov 29 12:35:01 2018 -0800
Committer: Jason Dere <[email protected]>
Committed: Thu Nov 29 12:35:01 2018 -0800

----------------------------------------------------------------------
 .../hadoop/hive/common/HeapMemoryMonitor.java   | 22 +++++++++++++++++---
 .../hive/streaming/AbstractRecordWriter.java    |  1 +
 2 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/75c6ee41/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
----------------------------------------------------------------------
diff --git 
a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java 
b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
index 42286be..56ec2fd 100644
--- a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
@@ -28,6 +28,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import javax.management.NotificationEmitter;
+import javax.management.NotificationListener;
+import javax.management.ListenerNotFoundException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +46,7 @@ public class HeapMemoryMonitor {
 
   private final double threshold;
   private List<Listener> listeners = new ArrayList<>();
+  private NotificationListener notificationListener;
 
   public interface Listener {
     void memoryUsageAboveThreshold(long usedMemory, long maxMemory);
@@ -140,7 +143,7 @@ public class HeapMemoryMonitor {
     }
     MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
     NotificationEmitter emitter = (NotificationEmitter) mxBean;
-    emitter.addNotificationListener((n, hb) -> {
+    notificationListener = (n, hb) -> {
       if (n.getType().equals(
         MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) {
         long maxMemory = tenuredGenPool.getUsage().getMax();
@@ -149,6 +152,19 @@ public class HeapMemoryMonitor {
           listener.memoryUsageAboveThreshold(usedMemory, maxMemory);
         }
       }
-    }, null, null);
+    };
+    emitter.addNotificationListener(notificationListener, null, null);
   }
-}
\ No newline at end of file
+
+  public void close() {
+    if(notificationListener != null) {
+      MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+      NotificationEmitter emitter = (NotificationEmitter) mxBean;
+      try {
+        emitter.removeNotificationListener(notificationListener);
+      } catch(ListenerNotFoundException e) {
+        LOG.warn("Failed to remove HeapMemoryMonitor notification listener 
from MemoryMXBean", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/75c6ee41/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 88a7d82..e7588e8 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -366,6 +366,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
 
   @Override
   public void close() throws StreamingIOFailure {
+    heapMemoryMonitor.close();
     boolean haveError = false;
     String partition = null;
     if (LOG.isDebugEnabled()) {

Reply via email to