Repository: samza Updated Branches: refs/heads/master 7b0a65b14 -> caec9c536
SAMZA-1452: Clean up interrupted thread bugs Call Thread.currentThread().interrupt(); when capturing InterruptedException Author: Nacho Solis <[email protected]> Reviewers: Jacob Maes <[email protected]>,Jagadish <[email protected]> Closes #322 from isolis/cleancodebugs Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/caec9c53 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/caec9c53 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/caec9c53 Branch: refs/heads/master Commit: caec9c5362635478a4272199504950789664d0cd Parents: 7b0a65b Author: Nacho Solis <[email protected]> Authored: Wed Oct 11 07:49:52 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Wed Oct 11 07:49:52 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/system/SystemStreamPartitionIterator.java | 1 + .../java/org/apache/samza/autoscaling/deployer/ConfigManager.java | 2 ++ .../java/org/apache/samza/coordinator/AzureJobCoordinator.java | 2 +- .../src/main/java/org/apache/samza/coordinator/AzureLock.java | 2 +- .../src/main/java/org/apache/samza/processor/StreamProcessor.java | 1 + .../main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java | 1 + .../main/java/org/apache/samza/monitor/SamzaMonitorService.java | 1 + .../src/main/java/org/apache/samza/rest/script/ScriptRunner.java | 1 + 8 files changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java index d1d61ed..726a5f4 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java @@ -90,6 +90,7 @@ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEn peeks.addAll(systemStreamPartitionEnvelopes); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new SamzaException(e); } } http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java ---------------------------------------------------------------------- diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java index 223d1d6..d1b532f 100644 --- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java +++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java @@ -131,6 +131,7 @@ public class ConfigManager { } catch (InterruptedException e) { e.printStackTrace(); log.warn("Got interrupt in config manager thread, so shutting down"); + Thread.currentThread().interrupt(); } finally { log.info("Stopping the config manager"); stop(); @@ -305,6 +306,7 @@ public class ConfigManager { } } catch (InterruptedException e) { e.printStackTrace(); + Thread.currentThread().interrupt(); } log.info("Killed the current job successfully"); http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java index 9438690..622932f 100644 --- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java @@ -427,7 +427,7 @@ public class AzureJobCoordinator implements JobCoordinator { try { Thread.sleep(random.nextInt(5000)); } catch (InterruptedException e) { - Thread.interrupted(); + Thread.currentThread().interrupt(); } LOG.info("Checking for barrier state on the blob again..."); blobBarrierState = leaderBlob.getBarrierState(); http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java index c0d3ff2..172a0f3 100644 --- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java @@ -75,7 +75,7 @@ public class AzureLock implements DistributedLockWithState { try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { - Thread.interrupted(); + Thread.currentThread().interrupt(); } LOG.info("Trying to acquire lock again..."); } http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 70be208..b548200 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -222,6 +222,7 @@ public class StreamProcessor { LOGGER.info("Container was not running.", icse); shutdownComplete = true; } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOGGER.warn("Container shutdown was interrupted!" + container.toString(), e); } LOGGER.info("Shutting down container done for pid=" + processorId + "; complete =" + shutdownComplete); http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java index fb9bb56..230625d 100644 --- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java @@ -224,6 +224,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap { try { super.put(systemStreamPartition, envelope); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new SamzaException("ReaderRunnable interrupted for ssp: " + systemStreamPartition); } } http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java index 80321f3..85491ad 100644 --- a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java +++ b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java @@ -90,6 +90,7 @@ public class SamzaMonitorService { } catch (IOException e) { LOGGER.error("Caught IOException during " + monitor.toString() + ".monitor()", e); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOGGER.error("Caught InterruptedException during " + monitor.toString() + ".monitor()", e); } catch (Exception e) { LOGGER.error("Unexpected exception during {}.monitor()", monitor, e); http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java index 1eab067..549541e 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java @@ -123,6 +123,7 @@ public class ScriptRunner { try { p.waitFor(); } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); return; } }
