Repository: hive
Updated Branches:
  refs/heads/branch-3 0a1bc3583 -> a7b3cf4bd


HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by 
Prasanth, Eric, Ashutosh)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a7b3cf4b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a7b3cf4b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a7b3cf4b

Branch: refs/heads/branch-3
Commit: a7b3cf4bd2239876b323ab544e12d547d875b6c4
Parents: 0a1bc35
Author: Shubham Chaurasia <[email protected]>
Authored: Mon Dec 10 01:33:00 2018 -0800
Committer: Prasanth Jayachandran <[email protected]>
Committed: Mon Dec 10 01:39:44 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/hive/streaming/AbstractRecordWriter.java | 5 +++++
 .../org/apache/hive/streaming/HiveStreamingConnection.java   | 8 +++++++-
 2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a7b3cf4b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 0408599..0653a5d 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -383,6 +383,11 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     if (LOG.isDebugEnabled()) {
       logStats("Stats after close:");
     }
+    try {
+      this.fs.close();
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error while closing FileSystem", e);
+    }
     if (haveError) {
       throw new StreamingIOFailure("Encountered errors while closing (see 
logs) " + getWatermark(partition));
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/a7b3cf4b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java 
b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
index 6cf14b0..8ca8fe2 100644
--- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
+++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java
@@ -158,6 +158,7 @@ public class HiveStreamingConnection implements 
StreamingConnection {
   private Table tableObject = null;
   private String metastoreUri;
   private ConnectionStats connectionStats;
+  private Runnable onShutdownRunner;
 
   private HiveStreamingConnection(Builder builder) throws StreamingException {
     this.database = builder.database.toLowerCase();
@@ -330,9 +331,10 @@ public class HiveStreamingConnection implements 
StreamingConnection {
         throw new StreamingException("Record writer cannot be null for 
streaming connection");
       }
       HiveStreamingConnection streamingConnection = new 
HiveStreamingConnection(this);
+      streamingConnection.onShutdownRunner = streamingConnection::close;
       // assigning higher priority than FileSystem shutdown hook so that 
streaming connection gets closed first before
       // filesystem close (to avoid ClosedChannelException)
-      ShutdownHookManager.addShutdownHook(streamingConnection::close,  
FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
+      
ShutdownHookManager.addShutdownHook(streamingConnection.onShutdownRunner,  
FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
       Thread.setDefaultUncaughtExceptionHandler((t, e) -> 
streamingConnection.close());
       return streamingConnection;
     }
@@ -551,6 +553,10 @@ public class HiveStreamingConnection implements 
StreamingConnection {
     } finally {
       getMSC().close();
       getHeatbeatMSC().close();
+      //remove shutdown hook entry added while creating this connection via 
HiveStreamingConnection.Builder#connect()
+      if (!ShutdownHookManager.isShutdownInProgress()) {
+        ShutdownHookManager.removeShutdownHook(this.onShutdownRunner);
+      }
     }
     if (LOG.isInfoEnabled()) {
       LOG.info("Closed streaming connection. Agent: {} Stats: {}", 
getAgentInfo(), getConnectionStats());

Reply via email to