Repository: storm
Updated Branches:
  refs/heads/master f42981760 -> c722940b6


STORM-2425: Storm Hive Bolt not closing open transactions

Invoking `retireIdleWriters` when the hive bolt receives a tick tuple will 
periodically close the idle connections.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3ce63b85
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3ce63b85
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3ce63b85

Branch: refs/heads/master
Commit: 3ce63b8551bd710451308f5d6e140a1a622564e4
Parents: 809c4b2
Author: Arun Mahadevan <[email protected]>
Authored: Thu Mar 16 12:16:10 2017 +0530
Committer: Arun Mahadevan <[email protected]>
Committed: Mon Mar 20 14:38:20 2017 +0530

----------------------------------------------------------------------
 .../org/apache/storm/hive/bolt/HiveBolt.java    | 33 +++++++++++---------
 1 file changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3ce63b85/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java 
b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
index aab575e..dc8be91 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
@@ -119,6 +119,9 @@ public class HiveBolt extends BaseRichBolt {
                 LOG.info("acknowledging tuples after writers flushed ");
                 batchHelper.ack();
             }
+            if (TupleUtils.isTick(tuple)) {
+                retireIdleWriters();
+            }
         } catch(SerializationError se) {
             LOG.info("Serialization exception occurred, tuple is acknowledged 
but not written to Hive.", tuple);
             this.collector.reportError(se);
@@ -308,30 +311,32 @@ public class HiveBolt extends BaseRichBolt {
         LOG.info("Attempting close idle writers");
         int count = 0;
         long now = System.currentTimeMillis();
-        ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
 
         //1) Find retirement candidates
         for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
             if(now - entry.getValue().getLastUsed() > 
options.getIdleTimeout()) {
                 ++count;
-                retirees.add(entry.getKey());
+                retire(entry.getKey());
             }
         }
-        //2) Retire them
-        for(HiveEndPoint ep : retirees) {
-            try {
+        return count;
+    }
+
+    private void retire(HiveEndPoint ep) {
+        try {
+            HiveWriter writer = allWriters.remove(ep);
+            if (writer != null) {
                 LOG.info("Closing idle Writer to Hive end point : {}", ep);
-                allWriters.remove(ep).flushAndClose();
-            } catch (IOException e) {
-                LOG.warn("Failed to close writer for end point: {}. Error: "+ 
ep, e);
-            } catch (InterruptedException e) {
-                LOG.warn("Interrupted when attempting to close writer for end 
point: " + ep, e);
-                Thread.currentThread().interrupt();
-            } catch (Exception e) {
-                LOG.warn("Interrupted when attempting to close writer for end 
point: " + ep, e);
+                writer.flushAndClose();
             }
+        } catch (IOException e) {
+            LOG.warn("Failed to close writer for end point: {}. Error: " + ep, 
e);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted when attempting to close writer for end 
point: " + ep, e);
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            LOG.warn("Interrupted when attempting to close writer for end 
point: " + ep, e);
         }
-        return count;
     }
 
 }

Reply via email to