Repository: nifi Updated Branches: refs/heads/master 5cacc52cf -> 97461657b
NIFI-3744 - PutHiveStreaming cleanup null fixes This closes #1698. Signed-off-by: Pierre Villard <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/97461657 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/97461657 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/97461657 Branch: refs/heads/master Commit: 97461657b1fe3193e028d2a085734415563a26d8 Parents: 5cacc52 Author: Bryan Rosander <[email protected]> Authored: Wed Apr 26 10:56:40 2017 -0400 Committer: Pierre Villard <[email protected]> Committed: Wed Apr 26 22:54:49 2017 +0200 ---------------------------------------------------------------------- .../nifi/processors/hive/PutHiveStreaming.java | 22 +++++++++++--------- .../processors/hive/TestPutHiveStreaming.java | 2 +- 2 files changed, 13 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/97461657/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index d81108d..1494595 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -262,13 +262,12 @@ public class PutHiveStreaming extends AbstractProcessor { protected volatile UserGroupInformation ugi; protected volatile HiveConf hiveConfig; - protected final AtomicBoolean isInitialized = new AtomicBoolean(false); + protected final AtomicBoolean sendHeartBeat = new AtomicBoolean(false); protected HiveOptions options; protected ExecutorService callTimeoutPool; protected transient Timer heartBeatTimer; - protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false); - protected Map<HiveEndPoint, HiveWriter> allWriters; + protected Map<HiveEndPoint, HiveWriter> allWriters = Collections.emptyMap(); @Override @@ -662,17 +661,20 @@ public class PutHiveStreaming extends AbstractProcessor { } } } + allWriters = Collections.emptyMap(); - callTimeoutPool.shutdown(); - try { - while (!callTimeoutPool.isTerminated()) { - callTimeoutPool.awaitTermination(options.getCallTimeOut(), TimeUnit.MILLISECONDS); + if (callTimeoutPool != null) { + callTimeoutPool.shutdown(); + try { + while (!callTimeoutPool.isTerminated()) { + callTimeoutPool.awaitTermination(options.getCallTimeOut(), TimeUnit.MILLISECONDS); + } + } catch (Throwable t) { + log.warn("shutdown interrupted on " + callTimeoutPool, t); } - } catch (Throwable t) { - log.warn("shutdown interrupted on " + callTimeoutPool, t); + callTimeoutPool = null; } - callTimeoutPool = null; ugi = null; hiveConfigurator.stopRenewer(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/97461657/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java index 26e832d..61b5304 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java @@ -521,7 +521,7 @@ public class TestPutHiveStreaming { @Test public void cleanup() throws Exception { - + processor.cleanup(); } @Test
