Repository: hive Updated Branches: refs/heads/master 706bf724e -> f5618d922
HIVE-20979: Fix memory leak in hive streaming (Shubham Chaurasia reviewed by Prasanth, Eric, Ashutosh) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f5618d92 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f5618d92 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f5618d92 Branch: refs/heads/master Commit: f5618d9227e5f6e643aaf9d8d625dc1fc42180dc Parents: 706bf72 Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Mon Dec 10 01:33:00 2018 -0800 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Mon Dec 10 01:33:00 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/hive/streaming/AbstractRecordWriter.java | 5 +++++ .../org/apache/hive/streaming/HiveStreamingConnection.java | 8 +++++++- 2 files changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f5618d92/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index e7588e8..14d34d4 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -394,6 +394,11 @@ public abstract class AbstractRecordWriter implements RecordWriter { if (LOG.isDebugEnabled()) { logStats("Stats after close:"); } + try { + this.fs.close(); + } catch (IOException e) { + throw new StreamingIOFailure("Error while closing FileSystem", e); + } if (haveError) { throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark(partition)); } http://git-wip-us.apache.org/repos/asf/hive/blob/f5618d92/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index 74fc531..a32aa62 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -147,6 +147,7 @@ public class HiveStreamingConnection implements StreamingConnection { private int countTransactions = 0; private Set<String> partitions; private Long tableId; + private Runnable onShutdownRunner; private HiveStreamingConnection(Builder builder) throws StreamingException { this.database = builder.database.toLowerCase(); @@ -389,9 +390,10 @@ public class HiveStreamingConnection implements StreamingConnection { } HiveStreamingConnection streamingConnection = new HiveStreamingConnection(this); + streamingConnection.onShutdownRunner = streamingConnection::close; // assigning higher priority than FileSystem shutdown hook so that streaming connection gets closed first before // filesystem close (to avoid ClosedChannelException) - ShutdownHookManager.addShutdownHook(streamingConnection::close, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); + ShutdownHookManager.addShutdownHook(streamingConnection.onShutdownRunner, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); Thread.setDefaultUncaughtExceptionHandler((t, e) -> streamingConnection.close()); return streamingConnection; } @@ -651,6 +653,10 @@ public class HiveStreamingConnection implements StreamingConnection { getMSC().close(); getHeatbeatMSC().close(); } + //remove shutdown hook entry added while creating this connection via HiveStreamingConnection.Builder#connect() + if (!ShutdownHookManager.isShutdownInProgress()) { + ShutdownHookManager.removeShutdownHook(this.onShutdownRunner); + } } if (LOG.isInfoEnabled()) { LOG.info("Closed streaming connection. Agent: {} Stats: {}", getAgentInfo(), getConnectionStats());