wonook closed pull request #14: [NEMO-38] Fix bugs and vulnerabilities for 
RuntimeMaster and RuntimeCommon
URL: https://github.com/apache/incubator-nemo/pull/14
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
index 7daec35d..0a71d714 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
@@ -47,15 +47,17 @@ public String getSenderId() {
   }
 
   @Override
+  @SuppressWarnings("squid:S2095")
   public <U> void reply(final U replyMessage) {
     LOG.debug("[REPLY]: {}", replyMessage);
     final Connection connection = 
connectionFactory.newConnection(idFactory.getNewInstance(senderId));
     try {
       connection.open();
+      connection.write(replyMessage);
+      // We do not call connection.close since NCS caches connection.
+      // Disabling Sonar warning (squid:S2095)
     } catch (final NetworkException e) {
       throw new RuntimeException("Cannot connect to " + senderId, e);
     }
-
-    connection.write(replyMessage);
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 7f1dc911..6e0877e1 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -372,6 +372,7 @@ public JobState waitUntilFinish() {
       }
     } catch (final InterruptedException e) {
       LOG.warn("Interrupted during waiting the finish of Job ID {}", jobId);
+      Thread.currentThread().interrupt();
     } finally {
       finishLock.unlock();
     }
@@ -390,10 +391,13 @@ public JobState waitUntilFinish(final long timeout,
     finishLock.lock();
     try {
       if (!checkJobTermination()) {
-        jobFinishedCondition.await(timeout, unit);
+        if (!jobFinishedCondition.await(timeout, unit)) {
+          LOG.warn("Timeout during waiting the finish of Job ID {}", jobId);
+        }
       }
     } catch (final InterruptedException e) {
       LOG.warn("Interrupted during waiting the finish of Job ID {}", jobId);
+      Thread.currentThread().interrupt();
     } finally {
       finishLock.unlock();
     }
@@ -466,13 +470,11 @@ public void storeJSON(final String directory, final 
String suffix) {
 
     final File file = new File(directory, jobId + "-" + suffix + ".json");
     file.getParentFile().mkdirs();
-    try {
-      final PrintWriter printWriter = new PrintWriter(file);
+    try (final PrintWriter printWriter = new PrintWriter(file)) {
       printWriter.println(toStringWithPhysicalPlan());
-      printWriter.close();
       LOG.debug(String.format("JSON representation of job state for %s(%s) was 
saved to %s",
           jobId, suffix, file.getPath()));
-    } catch (IOException e) {
+    } catch (final IOException e) {
       LOG.warn(String.format("Cannot store JSON representation of job state 
for %s(%s) to %s: %s",
           jobId, suffix, file.getPath(), e.toString()));
     }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index 6806d743..4e08467d 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -187,6 +187,7 @@ public void await() {
         }
         hasDelayedSignal.set(false);
       } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new RuntimeException(e);
       } finally {
         lock.unlock();
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java
index 51c5db79..7300531e 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java
@@ -142,8 +142,9 @@ public synchronized void 
removeTaskGroupsAndDescendants(final String stageId) {
    * @param stageId for the stage to begin the removal recursively.
    */
   private synchronized void removeStageAndChildren(final String stageId) {
-    schedulableStages.remove(stageId);
-    stageIdToPendingTaskGroups.remove(stageId);
+    if (schedulableStages.remove(stageId)) {
+      stageIdToPendingTaskGroups.remove(stageId);
+    }
 
     physicalPlan.getStageDAG().getChildren(stageId).forEach(
         physicalStage -> removeStageAndChildren(physicalStage.getId()));
@@ -165,10 +166,11 @@ private synchronized void updateSchedulableStages(
     if (isSchedulable(candidateStageId, candidateStageContainerType)) {
       // Check for ancestor stages that became schedulable due to 
candidateStage's absence from the queue.
       jobDAG.getAncestors(candidateStageId).forEach(ancestorStage -> {
-        if (schedulableStages.contains(ancestorStage.getId())) {
-          // Remove the ancestor stage if it is of the same container type.
-          if 
(candidateStageContainerType.equals(ancestorStage.getContainerType())) {
-            schedulableStages.remove(ancestorStage.getId());
+        // Remove the ancestor stage if it is of the same container type.
+        if (schedulableStages.contains(ancestorStage.getId())
+            && 
candidateStageContainerType.equals(ancestorStage.getContainerType())) {
+          if (!schedulableStages.remove(ancestorStage.getId())) {
+            throw new RuntimeException(String.format("No such stage: %s", 
ancestorStage.getId()));
           }
         }
       });


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to