Repository: samza
Updated Branches:
  refs/heads/master 7b0a65b14 -> caec9c536


SAMZA-1452: Clean up interrupted thread bugs

Call Thread.currentThread().interrupt(); when capturing InterruptedException

Author: Nacho Solis <[email protected]>

Reviewers: Jacob Maes <[email protected]>,Jagadish <[email protected]>

Closes #322 from isolis/cleancodebugs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/caec9c53
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/caec9c53
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/caec9c53

Branch: refs/heads/master
Commit: caec9c5362635478a4272199504950789664d0cd
Parents: 7b0a65b
Author: Nacho Solis <[email protected]>
Authored: Wed Oct 11 07:49:52 2017 -0700
Committer: Jacob Maes <[email protected]>
Committed: Wed Oct 11 07:49:52 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/system/SystemStreamPartitionIterator.java     | 1 +
 .../java/org/apache/samza/autoscaling/deployer/ConfigManager.java  | 2 ++
 .../java/org/apache/samza/coordinator/AzureJobCoordinator.java     | 2 +-
 .../src/main/java/org/apache/samza/coordinator/AzureLock.java      | 2 +-
 .../src/main/java/org/apache/samza/processor/StreamProcessor.java  | 1 +
 .../main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java | 1 +
 .../main/java/org/apache/samza/monitor/SamzaMonitorService.java    | 1 +
 .../src/main/java/org/apache/samza/rest/script/ScriptRunner.java   | 1 +
 8 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
 
b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
index d1d61ed..726a5f4 100644
--- 
a/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
+++ 
b/samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
@@ -90,6 +90,7 @@ public class SystemStreamPartitionIterator implements 
Iterator<IncomingMessageEn
           peeks.addAll(systemStreamPartitionEnvelopes);
         }
       } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new SamzaException(e);
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
----------------------------------------------------------------------
diff --git 
a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
 
b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
index 223d1d6..d1b532f 100644
--- 
a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
+++ 
b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
@@ -131,6 +131,7 @@ public class ConfigManager {
     } catch (InterruptedException e) {
       e.printStackTrace();
       log.warn("Got interrupt in config manager thread, so shutting down");
+      Thread.currentThread().interrupt();
     } finally {
       log.info("Stopping the config manager");
       stop();
@@ -305,6 +306,7 @@ public class ConfigManager {
       }
     } catch (InterruptedException e) {
       e.printStackTrace();
+      Thread.currentThread().interrupt();
     }
 
     log.info("Killed the current job successfully");

http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 9438690..622932f 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -427,7 +427,7 @@ public class AzureJobCoordinator implements JobCoordinator {
           try {
             Thread.sleep(random.nextInt(5000));
           } catch (InterruptedException e) {
-            Thread.interrupted();
+            Thread.currentThread().interrupt();
           }
           LOG.info("Checking for barrier state on the blob again...");
           blobBarrierState = leaderBlob.getBarrierState();

http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java 
b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
index c0d3ff2..172a0f3 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
@@ -75,7 +75,7 @@ public class AzureLock implements DistributedLockWithState {
         try {
           Thread.sleep(random.nextInt(1000));
         } catch (InterruptedException e) {
-          Thread.interrupted();
+          Thread.currentThread().interrupt();
         }
         LOG.info("Trying to acquire lock again...");
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 70be208..b548200 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -222,6 +222,7 @@ public class StreamProcessor {
               LOGGER.info("Container was not running.", icse);
               shutdownComplete = true;
             } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
               LOGGER.warn("Container shutdown was interrupted!" + 
container.toString(), e);
             }
             LOGGER.info("Shutting down container done for pid=" + processorId 
+ "; complete =" + shutdownComplete);

http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
index fb9bb56..230625d 100644
--- 
a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
+++ 
b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -224,6 +224,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap 
{
     try {
       super.put(systemStreamPartition, envelope);
     } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
       throw new SamzaException("ReaderRunnable interrupted for ssp: " + 
systemStreamPartition);
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
index 80321f3..85491ad 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
@@ -90,6 +90,7 @@ public class SamzaMonitorService {
                 } catch (IOException e) {
                     LOGGER.error("Caught IOException during " + 
monitor.toString() + ".monitor()", e);
                 } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                     LOGGER.error("Caught InterruptedException during " + 
monitor.toString() + ".monitor()", e);
                 } catch (Exception e) {
                     LOGGER.error("Unexpected exception during {}.monitor()", 
monitor, e);

http://git-wip-us.apache.org/repos/asf/samza/blob/caec9c53/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java 
b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java
index 1eab067..549541e 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java
@@ -123,6 +123,7 @@ public class ScriptRunner {
         try {
           p.waitFor();
         } catch (InterruptedException ignore) {
+          Thread.currentThread().interrupt();
           return;
         }
       }

Reply via email to