Repository: hive
Updated Branches:
  refs/heads/master 706bf724e -> f5618d922


HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by 
Prasanth, Eric, Ashutosh)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f5618d92
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f5618d92
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f5618d92

Branch: refs/heads/master
Commit: f5618d9227e5f6e643aaf9d8d625dc1fc42180dc
Parents: 706bf72
Author: Prasanth Jayachandran <prasan...@apache.org>
Authored: Mon Dec 10 01:33:00 2018 -0800
Committer: Prasanth Jayachandran <prasan...@apache.org>
Committed: Mon Dec 10 01:33:00 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/hive/streaming/AbstractRecordWriter.java | 5 +++++
 .../org/apache/hive/streaming/HiveStreamingConnection.java   | 8 +++++++-
 2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f5618d92/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index e7588e8..14d34d4 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -394,6 +394,11 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     if (LOG.isDebugEnabled()) {
       logStats("Stats after close:");
     }
+    try {
+      this.fs.close();
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error while closing FileSystem", e);
+    }
     if (haveError) {
       throw new StreamingIOFailure("Encountered errors while closing (see 
logs) " + getWatermark(partition));
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/f5618d92/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java 
b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 74fc531..a32aa62 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -147,6 +147,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   private int countTransactions = 0;
   private Set<String> partitions;
   private Long tableId;
+  private Runnable onShutdownRunner;
 
   private HiveStreamingConnection(Builder builder) throws StreamingException {
     this.database = builder.database.toLowerCase();
@@ -389,9 +390,10 @@ public class HiveStreamingConnection implements 
StreamingConnection {
       }
 
       HiveStreamingConnection streamingConnection = new 
HiveStreamingConnection(this);
+      streamingConnection.onShutdownRunner = streamingConnection::close;
       // assigning higher priority than FileSystem shutdown hook so that 
streaming connection gets closed first before
       // filesystem close (to avoid ClosedChannelException)
-      ShutdownHookManager.addShutdownHook(streamingConnection::close,  
FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
+      
ShutdownHookManager.addShutdownHook(streamingConnection.onShutdownRunner,  
FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
       Thread.setDefaultUncaughtExceptionHandler((t, e) -> 
streamingConnection.close());
       return streamingConnection;
     }
@@ -651,6 +653,10 @@ public class HiveStreamingConnection implements 
StreamingConnection {
         getMSC().close();
         getHeatbeatMSC().close();
       }
+      //remove shutdown hook entry added while creating this connection via 
HiveStreamingConnection.Builder#connect()
+      if (!ShutdownHookManager.isShutdownInProgress()) {
+        ShutdownHookManager.removeShutdownHook(this.onShutdownRunner);
+      }
     }
     if (LOG.isInfoEnabled()) {
       LOG.info("Closed streaming connection. Agent: {} Stats: {}", 
getAgentInfo(), getConnectionStats());

Reply via email to