Repository: hive Updated Branches: refs/heads/branch-3 0a1bc3583 -> a7b3cf4bd
HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by Prasanth, Eric, Ashutosh) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a7b3cf4b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a7b3cf4b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a7b3cf4b Branch: refs/heads/branch-3 Commit: a7b3cf4bd2239876b323ab544e12d547d875b6c4 Parents: 0a1bc35 Author: Shubham Chaurasia <[email protected]> Authored: Mon Dec 10 01:33:00 2018 -0800 Committer: Prasanth Jayachandran <[email protected]> Committed: Mon Dec 10 01:39:44 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/hive/streaming/AbstractRecordWriter.java | 5 +++++ .../org/apache/hive/streaming/HiveStreamingConnection.java | 8 +++++++- 2 files changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a7b3cf4b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 0408599..0653a5d 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -383,6 +383,11 @@ public abstract class AbstractRecordWriter implements RecordWriter { if (LOG.isDebugEnabled()) { logStats("Stats after close:"); } + try { + this.fs.close(); + } catch (IOException e) { + throw new StreamingIOFailure("Error while closing FileSystem", e); + } if (haveError) { throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark(partition)); } http://git-wip-us.apache.org/repos/asf/hive/blob/a7b3cf4b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 6cf14b0..8ca8fe2 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -158,6 +158,7 @@ public class HiveStreamingConnection implements StreamingConnection { private Table tableObject = null; private String metastoreUri; private ConnectionStats connectionStats; + private Runnable onShutdownRunner; private HiveStreamingConnection(Builder builder) throws StreamingException { this.database = builder.database.toLowerCase(); @@ -330,9 +331,10 @@ public class HiveStreamingConnection implements StreamingConnection { throw new StreamingException("Record writer cannot be null for streaming connection"); } HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this); + streamingConnection.onShutdownRunner = streamingConnection::close; // assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before // filesystem close (to avoid ClosedChannelException) - ShutdownHookManager.addShutdownHook(streamingConnection::close, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); + ShutdownHookManager.addShutdownHook(streamingConnection.onShutdownRunner, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); Thread.setDefaultUncaughtExceptionHandler((t, e) -> streamingConnection.close()); return streamingConnection; } @@ -551,6 +553,10 @@ public class HiveStreamingConnection implements StreamingConnection { } finally { getMSC().close(); getHeatbeatMSC().close(); + //remove shutdown hook entry added while creating this connection via HiveStreamingConnection.Builder#connect() + if (!ShutdownHookManager.isShutdownInProgress()) { + ShutdownHookManager.removeShutdownHook(this.onShutdownRunner); + } } if (LOG.isInfoEnabled()) { LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());
