Repository: storm Updated Branches: refs/heads/master f42981760 -> c722940b6
STORM-2425: Storm Hive Bolt not closing open transactions Invoking `retireIdleWriters` when the hive bolt receives a tick tuple will periodically close the idle connections. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3ce63b85 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3ce63b85 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3ce63b85 Branch: refs/heads/master Commit: 3ce63b8551bd710451308f5d6e140a1a622564e4 Parents: 809c4b2 Author: Arun Mahadevan <[email protected]> Authored: Thu Mar 16 12:16:10 2017 +0530 Committer: Arun Mahadevan <[email protected]> Committed: Mon Mar 20 14:38:20 2017 +0530 ---------------------------------------------------------------------- .../org/apache/storm/hive/bolt/HiveBolt.java | 33 +++++++++++--------- 1 file changed, 19 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/3ce63b85/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java index aab575e..dc8be91 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java @@ -119,6 +119,9 @@ public class HiveBolt extends BaseRichBolt { LOG.info("acknowledging tuples after writers flushed "); batchHelper.ack(); } + if (TupleUtils.isTick(tuple)) { + retireIdleWriters(); + } } catch(SerializationError se) { LOG.info("Serialization exception occurred, tuple is acknowledged but not written to Hive.", tuple); this.collector.reportError(se); @@ -308,30 +311,32 @@ public class HiveBolt extends BaseRichBolt { LOG.info("Attempting close idle writers"); int count = 0; long now = System.currentTimeMillis(); - ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>(); //1) Find retirement candidates for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { if(now - entry.getValue().getLastUsed() > options.getIdleTimeout()) { ++count; - retirees.add(entry.getKey()); + retire(entry.getKey()); } } - //2) Retire them - for(HiveEndPoint ep : retirees) { - try { + return count; + } + + private void retire(HiveEndPoint ep) { + try { + HiveWriter writer = allWriters.remove(ep); + if (writer != null) { LOG.info("Closing idle Writer to Hive end point : {}", ep); - allWriters.remove(ep).flushAndClose(); - } catch (IOException e) { - LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e); - } catch (InterruptedException e) { - LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e); + writer.flushAndClose(); } + } catch (IOException e) { + LOG.warn("Failed to close writer for end point: {}. Error: " + ep, e); + } catch (InterruptedException e) { + LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e); } - return count; } }
