STORM-1771. HiveState should flushAndClose before closing old or idle Hive connections.
Signed-off-by: P. Taylor Goetz <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ca186cd1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ca186cd1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ca186cd1 Branch: refs/heads/1.0.x-branch Commit: ca186cd1a62c90022a6b8cd4616efd5b32505a5a Parents: 1a50bec Author: Sriharsha Chintalapani <[email protected]> Authored: Mon Jun 6 10:39:45 2016 -0700 Committer: P. Taylor Goetz <[email protected]> Committed: Thu Jun 9 16:01:49 2016 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/storm/hive/trident/HiveState.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ca186cd1/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java index 08a5953..cef1a4f 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java @@ -236,12 +236,14 @@ public class HiveState implements State { } try { LOG.info("Closing least used Writer to Hive end point : " + eldest); - allWriters.remove(eldest).close(); + allWriters.remove(eldest).flushAndClose(); } catch (IOException e) { LOG.warn("Failed to close writer for end point: " + eldest, e); } catch (InterruptedException e) { LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e); Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e); } } @@ -265,12 +267,14 @@ public class HiveState implements State { for(HiveEndPoint ep : retirees) { try { LOG.info("Closing idle Writer to Hive end point : {}", ep); - allWriters.remove(ep).close(); + allWriters.remove(ep).flushAndClose(); } catch (IOException e) { LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e); } catch (InterruptedException e) { LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e); Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e); } } return count;
