seojangho closed pull request #71: [NEMO-55] Handle NCS Master-to-Executor RPC 
failures
URL: https://github.com/apache/incubator-nemo/pull/71
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java 
b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 6826174c2..0b80b83b3 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -245,7 +245,7 @@ public static Configuration getJobConf(final String[] args) 
throws IOException,
     cl.registerShortNameOfClass(JobConf.BandwidthJSONPath.class);
     cl.registerShortNameOfClass(JobConf.JVMHeapSlack.class);
     cl.registerShortNameOfClass(JobConf.IORequestHandleThreadsTotal.class);
-    cl.registerShortNameOfClass(JobConf.MaxScheduleAttempt.class);
+    cl.registerShortNameOfClass(JobConf.MaxTaskAttempt.class);
     cl.registerShortNameOfClass(JobConf.FileDirectory.class);
     cl.registerShortNameOfClass(JobConf.GlusterVolumeDirectory.class);
     cl.registerShortNameOfClass(JobConf.PartitionTransportServerPort.class);
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java 
b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
index 46198f016..aeaea6fbb 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
@@ -411,7 +411,7 @@ public void storeJSON(final String directory, final String 
name, final String de
     try (final PrintWriter printWriter = new PrintWriter(file)) {
       printWriter.println(toString());
       printWriter.close();
-      LOG.info(String.format("DAG JSON for %s is saved at %s"
+      LOG.debug(String.format("DAG JSON for %s is saved at %s"
           + " (Use https://service.jangho.kr/nemo-dag/ to visualize it.)", 
description, file.getPath()));
     } catch (IOException e) {
       LOG.warn(String.format("Cannot store JSON representation of %s to %s: 
%s",
diff --git a/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java 
b/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
index aeb27a864..9b11b5600 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
@@ -71,6 +71,11 @@ public ArgBuilder addOptimizationPolicy(final String policy) 
{
     return this;
   }
 
+  public ArgBuilder addMaxTaskAttempt(final int maxAttempt) {
+    args.add(Arrays.asList("-max_task_attempt", String.valueOf(maxAttempt)));
+    return this;
+  }
+
   /**
    * @param directory directory to save the DAG.
    * @return builder with the DAG directory.
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java 
b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index bed8099d9..8d016aff8 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -99,7 +99,7 @@
   public final class OptimizationPolicy implements Name<String> {
   }
 
-  //////////////////////////////// Runtime Configurations
+  //////////////////////////////// Runtime Master-Executor Common 
Configurations
 
   /**
    * Deploy mode.
@@ -108,6 +108,16 @@
   public final class DeployMode implements Name<String> {
   }
 
+  /**
+   * The fraction of container memory not to use fo the JVM heap.
+   */
+  @NamedParameter(doc = "The fraction of the container memory not to use for 
the JVM heap", short_name = "heap_slack",
+      default_value = "0.3")
+  public final class JVMHeapSlack implements Name<Double> {
+  }
+
+  //////////////////////////////// Runtime Master Configurations
+
   /**
    * Nemo driver memory.
    */
@@ -115,6 +125,24 @@
   public final class DriverMemMb implements Name<Integer> {
   }
 
+  /**
+<<<<<<< HEAD
+   * Max number of attempts for task scheduling.
+   */
+  @NamedParameter(doc = "Max number of task attempts", short_name = 
"max_task_attempt", default_value = "1")
+  public final class MaxTaskAttempt implements Name<Integer> {
+  }
+
+  //////////////////////////////// Runtime Executor Configurations
+
+  /**
+   * Used for fault-injected tests.
+   */
+  @NamedParameter(doc = "Executor crashes after expected time, does not crash 
when -1",
+      short_name = "executor_poison_sec", default_value = "-1")
+  public final class ExecutorPosionSec implements Name<Integer> {
+  }
+
   /**
    * Path to the JSON file that specifies bandwidth between locations.
    */
@@ -138,13 +166,6 @@
   public final class BandwidthJSONContents implements Name<String> {
   }
 
-  /**
-   * The fraction of container memory not to use fo the JVM heap.
-   */
-  @NamedParameter(doc = "The fraction of the container memory not to use for 
the JVM heap", short_name = "heap_slack",
-      default_value = "0.3")
-  public final class JVMHeapSlack implements Name<Double> {
-  }
 
   /**
    * Contents of the JSON file that specifies resource layout.
@@ -153,16 +174,7 @@
   public final class ExecutorJSONContents implements Name<String> {
   }
 
-  /**
-   * Executor capacity.
-   * Determines the number of Task 'slots' for each executor.
-   * 1) Master's Task scheduler can use this number in scheduling.
-   *    (e.g., schedule Task to the executor currently with the maximum number 
of available slots)
-   * 2) Executor's number of Task execution threads is set to this number.
-   */
-  @NamedParameter(doc = "Executor capacity", short_name = "executor_capacity", 
default_value = "1")
-  public final class ExecutorCapacity implements Name<Integer> {
-  }
+  //////////////////////////////// Runtime Data Plane Configurations
 
   /**
    * Number of I/O threads for block fetch requests from other executor.
@@ -180,13 +192,6 @@
   public final class MaxNumDownloadsForARuntimeEdge implements Name<Integer> {
   }
 
-  /**
-   * Max number of attempts for task scheduling.
-   */
-  @NamedParameter(doc = "Max number of schedules", short_name = 
"max_schedule_attempt", default_value = "3")
-  public final class MaxScheduleAttempt implements Name<Integer> {
-  }
-
   /**
    * The number of serialization threads for scheduling.
    */
diff --git 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
index e4e71ae04..a5b1f39e9 100644
--- 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
@@ -18,8 +18,9 @@
 import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.test.ArgBuilder;
 import edu.snu.nemo.common.test.ExampleTestUtil;
-import edu.snu.nemo.examples.beam.policy.DefaultPolicyParallelismFive;
+import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
 import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive;
+import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismTen;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,7 +34,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 public final class AlternatingLeastSquareITCase {
-  private static final int TIMEOUT = 240000;
+  private static final int TIMEOUT = 240 * 1000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + 
"/../resources/";
 
@@ -41,7 +42,8 @@
   private static final String outputFileName = "sample_output_als";
   private static final String output = fileBasePath + outputFileName;
   private static final String testResourceFileName = "test_output_als";
-  private static final String executorResourceFileName = fileBasePath + 
"beam_sample_executor_resources.json";
+  private static final String noPoisonResources = fileBasePath + 
"beam_sample_executor_resources.json";
+  private static final String poisonedResource = fileBasePath + 
"beam_sample_poisoned_executor_resources.json";
   private static final String numFeatures = "10";
   private static final String numIteration = "3";
   private static final String lambda = "0.05";
@@ -49,7 +51,6 @@
   @Before
   public void setUp() throws Exception {
     builder = new ArgBuilder()
-        .addResourceJson(executorResourceFileName)
         .addUserMain(AlternatingLeastSquare.class.getCanonicalName())
         .addUserArgs(input, numFeatures, numIteration, lambda, output);
   }
@@ -64,18 +65,21 @@ public void tearDown() throws Exception {
   }
 
   @Test (timeout = TIMEOUT)
-  public void test() throws Exception {
+  public void testDefault() throws Exception {
     JobLauncher.main(builder
-        .addJobId(AlternatingLeastSquareITCase.class.getSimpleName())
-        
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+        .addResourceJson(noPoisonResources)
+        .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + 
"_default")
+        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
         .build());
   }
 
   @Test (timeout = TIMEOUT)
-  public void testPado() throws Exception {
+  public void testPadoWithPoison() throws Exception {
     JobLauncher.main(builder
-        .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_pado")
-        
.addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
+        .addResourceJson(poisonedResource)
+        .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + 
"_pado_poisoned")
+        .addMaxTaskAttempt(Integer.MAX_VALUE)
+        
.addOptimizationPolicy(PadoPolicyParallelismTen.class.getCanonicalName())
         .build());
   }
 }
diff --git 
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/PadoPolicyParallelismTen.java
 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/PadoPolicyParallelismTen.java
new file mode 100644
index 000000000..53e82cc13
--- /dev/null
+++ 
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/policy/PadoPolicyParallelismTen.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.beam.policy;
+
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import edu.snu.nemo.compiler.optimizer.policy.PadoPolicy;
+import edu.snu.nemo.compiler.optimizer.policy.Policy;
+import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
+
+import java.util.List;
+
+/**
+ * A pado policy with fixed parallelism 10 for tests.
+ */
+public final class PadoPolicyParallelismTen implements Policy {
+  private final Policy policy;
+
+  public PadoPolicyParallelismTen() {
+    this.policy = PolicyTestUtil.overwriteParallelism(10, 
PadoPolicy.class.getCanonicalName());
+  }
+
+  @Override
+  public List<CompileTimePass> getCompileTimePasses() {
+    return this.policy.getCompileTimePasses();
+  }
+
+  @Override
+  public List<RuntimePass<?>> getRuntimePasses() {
+    return this.policy.getRuntimePasses();
+  }
+}
diff --git a/examples/resources/beam_sample_poisoned_executor_resources.json 
b/examples/resources/beam_sample_poisoned_executor_resources.json
new file mode 100644
index 000000000..b7614a94d
--- /dev/null
+++ b/examples/resources/beam_sample_poisoned_executor_resources.json
@@ -0,0 +1,13 @@
+[
+  {
+    "type": "Transient",
+    "memory_mb": 512,
+    "capacity": 15,
+    "poison_sec": 2
+  },
+  {
+    "type": "Reserved",
+    "memory_mb": 512,
+    "capacity": 15
+  }
+]
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/FailedMessageSender.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/FailedMessageSender.java
new file mode 100644
index 000000000..1218b1ec4
--- /dev/null
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/FailedMessageSender.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.message;
+
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A message sender that failed.
+ */
+public final class FailedMessageSender implements 
MessageSender<ControlMessage.Message> {
+  @Override
+  public void send(final ControlMessage.Message message) {
+    // Do nothing.
+  }
+
+  @Override
+  public CompletableFuture<ControlMessage.Message> request(final 
ControlMessage.Message message) {
+    final CompletableFuture<ControlMessage.Message> failed = new 
CompletableFuture<>();
+    failed.completeExceptionally(new Throwable("Failed Message Sender"));
+    return failed;
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Do nothing.
+  }
+}
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
index ea478b7c0..beefcef2f 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageContext.java
@@ -56,7 +56,9 @@ public String getSenderId() {
       // We do not call connection.close since NCS caches connection.
       // Disabling Sonar warning (squid:S2095)
     } catch (final NetworkException e) {
-      throw new RuntimeException("Cannot connect to " + senderId, e);
+      // TODO #140: Properly classify and handle each RPC failure
+      // Not logging the stacktrace here, as it's not very useful.
+      LOG.error("NCS Exception");
     }
   }
 }
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 4d07752e5..676cb1598 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -27,10 +27,11 @@
 import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.IdentifierFactory;
 import org.apache.reef.wake.remote.transport.LinkListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.net.SocketAddress;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +42,8 @@
  * Message environment for NCS.
  */
 public final class NcsMessageEnvironment implements MessageEnvironment {
+  private static final Logger LOG = 
LoggerFactory.getLogger(NcsMessageEnvironment.class.getName());
+
   private static final String NCS_CONN_FACTORY_ID = "NCS_CONN_FACTORY_ID";
 
   private final NetworkConnectionService networkConnectionService;
@@ -62,7 +65,7 @@ private NcsMessageEnvironment(
     this.senderId = senderId;
     this.replyFutureMap = new ReplyFutureMap<>();
     this.listenerConcurrentMap = new ConcurrentHashMap<>();
-    this.receiverToConnectionMap = new HashMap<>();
+    this.receiverToConnectionMap = new ConcurrentHashMap<>();
     this.connectionFactory = 
networkConnectionService.registerConnectionFactory(
         idFactory.getNewInstance(NCS_CONN_FACTORY_ID),
         new ControlMessageCodec(),
@@ -87,17 +90,15 @@ public void removeListener(final String listenerId) {
   public <T> Future<MessageSender<T>> asyncConnect(final String receiverId, 
final String listenerId) {
     try {
       // If the connection toward the receiver exists already, reuses it.
-      final Connection connection = 
receiverToConnectionMap.computeIfAbsent(receiverId, absentReceiverId -> {
-        try {
-          final Connection newConnection = 
connectionFactory.newConnection(idFactory.getNewInstance(absentReceiverId));
-          newConnection.open();
-          return newConnection;
-        } catch (final NetworkException e) {
-          throw new RuntimeException(e);
-        }
-      });
+      final Connection connection;
+      if (receiverToConnectionMap.containsKey(receiverId)) {
+        connection = receiverToConnectionMap.get(receiverId);
+      } else {
+        connection = 
connectionFactory.newConnection(idFactory.getNewInstance(receiverId));
+        connection.open();
+      }
       return CompletableFuture.completedFuture((MessageSender) new 
NcsMessageSender(connection, replyFutureMap));
-    } catch (final Exception e) {
+    } catch (final NetworkException e) {
       final CompletableFuture<MessageSender<T>> failedFuture = new 
CompletableFuture<>();
       failedFuture.completeExceptionally(e);
       return failedFuture;
@@ -166,8 +167,9 @@ public void onSuccess(final Message<ControlMessage.Message> 
messages) {
     public void onException(final Throwable throwable,
                             final SocketAddress socketAddress,
                             final Message<ControlMessage.Message> messages) {
-      final ControlMessage.Message controlMessage = 
extractSingleMessage(messages);
-      throw new RuntimeException(controlMessage.toString(), throwable);
+      // TODO #140: Properly classify and handle each RPC failure
+      // Not logging the stacktrace here, as it's not very useful.
+      LOG.error("NCS Exception");
     }
   }
 
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
index db24f4d17..aaa2337a0 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
@@ -112,7 +112,7 @@ private void addEvent(final 
StateTransitionEvent<TaskState.State> event) {
 
   @Override
   public final boolean processMetricMessage(final String metricField, final 
byte[] metricValue) {
-    LOG.info("metric {} is just arrived!", metricField);
+    LOG.debug("metric {} is just arrived!", metricField);
     switch (metricField) {
       case "serializedReadBytes":
         setSerializedReadBytes(SerializationUtils.deserialize(metricValue));
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index ad62e6856..5750c2aaa 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -119,6 +119,11 @@ public String propertiesToJSON() {
     return sb.toString();
   }
 
+  @Override
+  public String toString() {
+    return propertiesToJSON();
+  }
+
   /**
    * @return the list between the task idx and key range to read.
    */
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index ce0a4ff2f..7663a8cc2 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -18,7 +18,6 @@
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
-import org.apache.commons.lang.SerializationUtils;
 
 import java.io.Serializable;
 import java.util.List;
@@ -145,8 +144,10 @@ public String toString() {
     sb.append(taskId);
     sb.append(" / attempt: ");
     sb.append(attemptIdx);
-    sb.append(" / irDAG: ");
-    sb.append(SerializationUtils.deserialize(serializedIRDag));
+    sb.append(" / incoming: ");
+    sb.append(taskIncomingEdges);
+    sb.append(" / outgoing: ");
+    sb.append(taskOutgoingEdges);
     sb.append("/ exec props: ");
     sb.append(getExecutionProperties());
     return sb.toString();
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
index 74b808c73..be83bf26b 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
@@ -57,6 +57,8 @@ private StateMachine buildTaskStateMachine() {
 
     // From SHOULD_RETRY
     stateMachineBuilder.addTransition(State.SHOULD_RETRY, State.READY, "Ready 
to be retried");
+    stateMachineBuilder.addTransition(State.SHOULD_RETRY, State.SHOULD_RETRY,
+        "SHOULD_RETRY can be caused by multiple reasons");
 
     stateMachineBuilder.setInitialState(State.READY);
     return stateMachineBuilder.build();
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java 
b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java
index d89b7e0ad..491e9506f 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoContext.java
@@ -15,16 +15,20 @@
  */
 package edu.snu.nemo.driver;
 
+import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.executor.Executor;
 import org.apache.reef.annotations.audience.EvaluatorSide;
 import org.apache.reef.evaluator.context.events.ContextStart;
 import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.annotations.Unit;
 import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
+import java.util.Random;
 
 /**
  * REEF Context for the Executor.
@@ -32,13 +36,21 @@
 @EvaluatorSide
 @Unit
 public final class NemoContext {
-
   private static final Logger LOG = 
LoggerFactory.getLogger(NemoContext.class.getName());
   private final Executor executor;
 
+  private final Clock clock;
+  private final int crashTimeSec;
+
   @Inject
-  private NemoContext(final Executor executor) {
+  private NemoContext(final Executor executor,
+                      @Parameter(JobConf.ExecutorPosionSec.class) final int 
crashTimeSec,
+                      final Clock clock) {
     this.executor = executor; // To make Tang instantiate Executor
+
+    // For poison handling
+    this.clock = clock;
+    this.crashTimeSec = crashTimeSec;
   }
 
   /**
@@ -48,6 +60,16 @@ private NemoContext(final Executor executor) {
     @Override
     public void onNext(final ContextStart contextStart) {
       LOG.info("Context Started: Executor is now ready and listening for 
messages");
+
+      // For poison handling
+      if (crashTimeSec >= 0) {
+        final int crashTimeMs = addNoise(crashTimeSec * 1000);
+        LOG.info("Configured {} sec crash time, and actually crashing in {} ms 
(noise)", crashTimeSec, crashTimeMs);
+        clock.scheduleAlarm(crashTimeMs, (alarm) -> {
+          LOG.info("Poison: crashing immediately");
+          Runtime.getRuntime().halt(1); // Forces this JVM to shut down 
immediately.
+        });
+      }
     }
   }
 
@@ -55,10 +77,15 @@ public void onNext(final ContextStart contextStart) {
    * Called when the context is stopped.
    */
   public final class ContextStopHandler implements EventHandler<ContextStop> {
-
     @Override
     public void onNext(final ContextStop contextStop) {
       executor.terminate();
     }
   }
+
+  private int addNoise(final int number) {
+    final Random random = new Random();
+    final int fiftyPercent = random.nextInt((int) (number * (50.0 / 100.0)));
+    return random.nextBoolean() ? number + fiftyPercent : number - 
fiftyPercent; // -50% ~ +50%
+  }
 }
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java 
b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index f4493e8f7..45191162c 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -23,6 +23,7 @@
 import edu.snu.nemo.runtime.common.message.MessageParameters;
 import edu.snu.nemo.runtime.master.ClientRPC;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.client.JobMessageObserver;
 import org.apache.reef.driver.context.ActiveContext;
@@ -158,7 +159,8 @@ public void onNext(final ActiveContext activeContext) {
    */
   public void startSchedulingUserApplication(final String dagString) {
     // Launch user application (with a new thread)
-    final ExecutorService userApplicationRunnerThread = 
Executors.newSingleThreadExecutor();
+    final ExecutorService userApplicationRunnerThread = 
Executors.newSingleThreadExecutor(
+        new BasicThreadFactory.Builder().namingPattern("User App 
thread-%d").build());
     userApplicationRunnerThread.execute(() -> 
userApplicationRunner.run(dagString));
     userApplicationRunnerThread.shutdown();
   }
diff --git 
a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java 
b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 6ba615f42..de2dcb0a5 100644
--- 
a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++ 
b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -58,7 +58,7 @@
   @Inject
   private UserApplicationRunner(@Parameter(JobConf.DAGDirectory.class) final 
String dagDirectory,
                                 @Parameter(JobConf.OptimizationPolicy.class) 
final String optimizationPolicy,
-                                @Parameter(JobConf.MaxScheduleAttempt.class) 
final int maxScheduleAttempt,
+                                @Parameter(JobConf.MaxTaskAttempt.class) final 
int maxScheduleAttempt,
                                 final PubSubEventHandlerWrapper 
pubSubEventHandlerWrapper,
                                 final Injector injector,
                                 final RuntimeMaster runtimeMaster) {
@@ -80,7 +80,7 @@ private 
UserApplicationRunner(@Parameter(JobConf.DAGDirectory.class) final Strin
    */
   public void run(final String dagString) {
     try {
-      LOG.info("##### Nemo Compiler #####");
+      LOG.info("##### Nemo Compiler Start #####");
 
       final DAG<IRVertex, IREdge> dag = 
SerializationUtils.deserialize(Base64.getDecoder().decode(dagString));
       dag.storeJSON(dagDirectory, "ir", "IR before optimization");
@@ -103,6 +103,8 @@ public void run(final String dagString) {
 
       final PhysicalPlan physicalPlan = backend.compile(optimizedDAG);
 
+      LOG.info("##### Nemo Compiler Finish #####");
+
       physicalPlan.getStageDAG().storeJSON(dagDirectory, "plan", "physical 
execution plan by compiler");
 
       // Execute!
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 8852e0e4a..25f6224bc 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -37,6 +37,7 @@
 import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
 import edu.snu.nemo.runtime.executor.task.TaskExecutor;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.reef.tang.annotations.Parameter;
 
 import javax.inject.Inject;
@@ -80,7 +81,9 @@ public Executor(@Parameter(JobConf.ExecutorId.class) final 
String executorId,
                   final DataTransferFactory dataTransferFactory,
                   final MetricManagerWorker metricMessageSender) {
     this.executorId = executorId;
-    this.executorService = Executors.newCachedThreadPool();
+    this.executorService = Executors.newCachedThreadPool(new 
BasicThreadFactory.Builder()
+        .namingPattern("TaskExecutor thread-%d")
+        .build());
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
     this.serializerManager = serializerManager;
     this.dataTransferFactory = dataTransferFactory;
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
index 405ce4cd4..d8f3b4151 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
@@ -70,10 +70,10 @@ private synchronized void flushMetricMessageQueueToMaster() 
{
 
       final ControlMessage.MetricMsg.Builder metricMsgBuilder = 
ControlMessage.MetricMsg.newBuilder();
 
-      LOG.info("MetricManagerWorker Size: {}", size);
+      LOG.debug("MetricManagerWorker Size: {}", size);
       for (int i = 0; i < size; i++) {
         final ControlMessage.Metric metric = metricMessageQueue.poll();
-        LOG.info("MetricManagerWorker addMetric: {}, {}, {}", size, i, metric);
+        LOG.debug("MetricManagerWorker addMetric: {}, {}, {}", size, i, 
metric);
         metricMsgBuilder.addMetric(i, metric);
       }
 
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
index ec8977b94..7cfc898f8 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
@@ -119,11 +119,7 @@ protected final void setChannelError(@Nullable final 
Throwable cause) {
       return;
     }
     hasException = true;
-    if (cause == null) {
-      LOG.error(String.format("A channel exception set on %s", toString()));
-    } else {
-      LOG.error(String.format("A channel exception set on %s", toString()), 
cause);
-    }
+    LOG.error(String.format("A channel exception set on %s", toString())); // 
Not logging throwable, which isn't useful
     exception = cause;
   }
 
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
index fa5116b2a..3e7aed6e3 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
@@ -209,12 +209,8 @@ ChannelFuture connectTo(final String remoteExecutorId) {
         LOG.debug("Connected to {}", remoteExecutorId);
         return;
       }
-      // Failed to connect
-      if (future.cause() == null) {
-        LOG.error("Failed to connect to {}", remoteExecutorId);
-      } else {
-        LOG.error(String.format("Failed to connect to %s", remoteExecutorId), 
future.cause());
-      }
+      // Failed to connect (Not logging the cause here, which is not very 
useful)
+      LOG.error("Failed to connect to {}", remoteExecutorId);
     });
     return connectFuture;
   }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
index 872b275a2..faf3ed190 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ClosableBlockingQueue.java
@@ -91,13 +91,15 @@ public synchronized void closeExceptionally(final Throwable 
throwableToSet) {
    */
   @Nullable
   public synchronized T take() throws InterruptedException {
+    while (queue.isEmpty() && !closed) {
+      wait();
+    }
+
+    // This should come after wait(), to be always checked on close
     if (throwable != null) {
       throw new RuntimeException(throwable);
     }
 
-    while (queue.isEmpty() && !closed) {
-      wait();
-    }
     // retrieves and removes the head of the underlying collection, or return 
null if the queue is empty
     return queue.poll();
   }
@@ -110,13 +112,15 @@ public synchronized T take() throws InterruptedException {
    */
   @Nullable
   public synchronized T peek() throws InterruptedException {
+    while (queue.isEmpty() && !closed) {
+      wait();
+    }
+
+    // This should come after wait(), to be always checked on close
     if (throwable != null) {
       throw new RuntimeException(throwable);
     }
 
-    while (queue.isEmpty() && !closed) {
-      wait();
-    }
     // retrieves the head of the underlying collection, or return null if the 
queue is empty
     return queue.peek();
   }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index c3d41f72c..d6a3da72e 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -61,18 +61,23 @@
   private static final String REMOTE_FILE_STORE = "REMOTE_FILE_STORE";
 
   private final String executorId;
+  private final SerializerManager serializerManager;
+
+  // Block stores
   private final MemoryStore memoryStore;
   private final SerializedMemoryStore serializedMemoryStore;
   private final LocalFileStore localFileStore;
   private final RemoteFileStore remoteFileStore;
+
+  // To-Master connections
   private final PersistentConnectionToMasterMap 
persistentConnectionToMasterMap;
+  private final Map<String, CompletableFuture<ControlMessage.Message>> 
pendingBlockLocationRequest;
+
+  // To-Executor connections
   private final ByteTransfer byteTransfer;
-  // Executor service to schedule I/O Runnable which can be done in background.
   private final ExecutorService backgroundExecutorService;
   private final Map<String, AtomicInteger> blockToRemainingRead;
-  private final SerializerManager serializerManager;
-  private final Map<String, CompletableFuture<ControlMessage.Message>> 
pendingBlockLocationRequest;
-  private final BlockTransferConnectionQueue blockTransferConnectionQueue;
+  private final BlockTransferThrottler blockTransferThrottler;
 
   /**
    * Constructor.
@@ -86,7 +91,7 @@
    * @param persistentConnectionToMasterMap the connection map.
    * @param byteTransfer                    the byte transfer.
    * @param serializerManager               the serializer manager.
-   * @param blockTransferConnectionQueue    restricts parallel connections
+   * @param blockTransferThrottler    restricts parallel connections
    */
   @Inject
   private BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String 
executorId,
@@ -98,7 +103,7 @@ private 
BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String exe
                              final PersistentConnectionToMasterMap 
persistentConnectionToMasterMap,
                              final ByteTransfer byteTransfer,
                              final SerializerManager serializerManager,
-                             final BlockTransferConnectionQueue 
blockTransferConnectionQueue) {
+                             final BlockTransferThrottler 
blockTransferThrottler) {
     this.executorId = executorId;
     this.memoryStore = memoryStore;
     this.serializedMemoryStore = serializedMemoryStore;
@@ -110,9 +115,11 @@ private 
BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String exe
     this.blockToRemainingRead = new ConcurrentHashMap<>();
     this.serializerManager = serializerManager;
     this.pendingBlockLocationRequest = new ConcurrentHashMap<>();
-    this.blockTransferConnectionQueue = blockTransferConnectionQueue;
+    this.blockTransferThrottler = blockTransferThrottler;
   }
 
+  //////////////////////////////////////////////////////////// Main public 
methods
+
   /**
    * Creates a new block.
    *
@@ -127,52 +134,6 @@ public Block createBlock(final String blockId,
     return store.createBlock(blockId);
   }
 
-  /**
-   * Retrieves data from the stored block. A specific hash value range can be 
designated.
-   *
-   * @param blockId    of the block.
-   * @param blockStore for the data storage.
-   * @param keyRange   the key range descriptor.
-   * @return the result data in the block.
-   */
-  private CompletableFuture<DataUtil.IteratorWithNumBytes> 
getDataFromLocalBlock(
-      final String blockId,
-      final InterTaskDataStoreProperty.Value blockStore,
-      final KeyRange keyRange) {
-    final BlockStore store = getBlockStore(blockStore);
-
-    // First, try to fetch the block from local BlockStore.
-    final Optional<Block> optionalBlock = store.readBlock(blockId);
-
-    if (optionalBlock.isPresent()) {
-      final Iterable<NonSerializedPartition> partitions = 
optionalBlock.get().readPartitions(keyRange);
-      handleUsedData(blockStore, blockId);
-
-      // Block resides in this evaluator!
-      try {
-        final Iterator innerIterator = 
DataUtil.concatNonSerPartitions(partitions).iterator();
-        long numSerializedBytes = 0;
-        long numEncodedBytes = 0;
-        try {
-          for (final NonSerializedPartition partition : partitions) {
-            numSerializedBytes += partition.getNumSerializedBytes();
-            numEncodedBytes += partition.getNumEncodedBytes();
-          }
-
-          return 
CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator,
 numSerializedBytes,
-              numEncodedBytes));
-        } catch (final 
DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
-          return 
CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator));
-        }
-      } catch (final IOException e) {
-        throw new BlockFetchException(e);
-      }
-    } else {
-      // We don't have the block here...
-      throw new RuntimeException(String.format("Block %s not found in local 
BlockManagerWorker", blockId));
-    }
-  }
-
   /**
    * Inquiries the location of the specific block and routes the request to 
the local block manager worker
    * or to the lower data plane.
@@ -184,7 +145,7 @@ public Block createBlock(final String blockId,
    * @param keyRange      the key range descriptor
    * @return the {@link CompletableFuture} of the block.
    */
-  public CompletableFuture<DataUtil.IteratorWithNumBytes> queryBlock(
+  public CompletableFuture<DataUtil.IteratorWithNumBytes> readBlock(
       final String blockId,
       final String runtimeEdgeId,
       final InterTaskDataStoreProperty.Value blockStore,
@@ -192,7 +153,10 @@ public Block createBlock(final String blockId,
     // Let's see if a remote worker has it
     final CompletableFuture<ControlMessage.Message> blockLocationFuture =
         pendingBlockLocationRequest.computeIfAbsent(blockId, blockIdToRequest 
-> {
-          // Ask Master for the location
+          // Ask Master for the location.
+          // (IMPORTANT): This 'request' effectively blocks the TaskExecutor 
thread if the block is IN_PROGRESS.
+          // We use this property to make the receiver task of a 'push' edge 
to wait in an Executor for its input data
+          // to become available.
           final CompletableFuture<ControlMessage.Message> 
responseFromMasterFuture = persistentConnectionToMasterMap
               
.getMessageSender(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
                   ControlMessage.Message.newBuilder()
@@ -221,7 +185,7 @@ public Block createBlock(final String blockId,
           responseFromMaster.getBlockLocationInfoMsg();
       if (!blockLocationInfoMsg.hasOwnerExecutorId()) {
         throw new BlockFetchException(new Throwable(
-            "Block " + blockId + " not found both in any storage: "
+            "Block " + blockId + " location unknown: "
                 + "The block state is " + blockLocationInfoMsg.getState()));
       }
       // This is the executor id that we wanted to know
@@ -236,11 +200,24 @@ public Block createBlock(final String blockId,
             .setRuntimeEdgeId(runtimeEdgeId)
             
.setKeyRange(ByteString.copyFrom(SerializationUtils.serialize(keyRange)))
             .build();
-        final CompletableFuture<ByteInputContext> contextFuture = 
blockTransferConnectionQueue
-            .requestConnectPermission(runtimeEdgeId)
+        final CompletableFuture<ByteInputContext> contextFuture = 
blockTransferThrottler
+            .requestTransferPermission(runtimeEdgeId)
             .thenCompose(obj -> byteTransfer.newInputContext(targetExecutorId, 
descriptor.toByteArray()));
-        contextFuture.thenApply(context -> context.getCompletedFuture()
-            .thenAccept(f -> 
blockTransferConnectionQueue.onConnectionFinished(runtimeEdgeId)));
+
+        // whenComplete() ensures that 
blockTransferThrottler.onTransferFinished() is always called,
+        // even on failures. Actual failure handling and Task retry will be 
done by DataFetcher.
+        contextFuture.whenComplete((connectionContext, connectionThrowable) -> 
{
+          if (connectionThrowable != null) {
+            // Something wrong with the connection. Notify 
blockTransferThrottler immediately.
+            blockTransferThrottler.onTransferFinished(runtimeEdgeId);
+          } else {
+            // Connection is okay. Notify blockTransferThrottler when the 
actual transfer is done, or fails.
+            
connectionContext.getCompletedFuture().whenComplete((transferContext, 
transferThrowable) -> {
+              blockTransferThrottler.onTransferFinished(runtimeEdgeId);
+            });
+          }
+        });
+
         return contextFuture
             .thenApply(context -> new 
DataUtil.InputStreamIterator(context.getInputStreams(),
                 serializerManager.getSerializer(runtimeEdgeId)));
@@ -364,48 +341,7 @@ public void removeBlock(final String blockId,
     }
   }
 
-  /**
-   * Handles used {@link edu.snu.nemo.runtime.executor.data.block.Block}.
-   *
-   * @param blockStore the store which contains the block.
-   * @param blockId    the ID of the block.
-   */
-  private void handleUsedData(final InterTaskDataStoreProperty.Value 
blockStore,
-                              final String blockId) {
-    final AtomicInteger remainingExpectedRead = 
blockToRemainingRead.get(blockId);
-    if (remainingExpectedRead != null) {
-      if (remainingExpectedRead.decrementAndGet() == 0) {
-        // This block should be discarded.
-        blockToRemainingRead.remove(blockId);
-        backgroundExecutorService.submit(new Runnable() {
-          @Override
-          public void run() {
-            removeBlock(blockId, blockStore);
-          }
-        });
-      }
-    } // If null, just keep the data in the store.
-  }
-
-  /**
-   * Gets the {@link BlockStore} from annotated value of {@link 
InterTaskDataStoreProperty}.
-   * @param blockStore the annotated value of {@link 
InterTaskDataStoreProperty}.
-   * @return the block store.
-   */
-  private BlockStore getBlockStore(final InterTaskDataStoreProperty.Value 
blockStore) {
-    switch (blockStore) {
-      case MemoryStore:
-        return memoryStore;
-      case SerializedMemoryStore:
-        return serializedMemoryStore;
-      case LocalFileStore:
-        return localFileStore;
-      case GlusterFileStore:
-        return remoteFileStore;
-      default:
-        throw new UnsupportedBlockStoreException(new Exception(blockStore + " 
is not supported."));
-    }
-  }
+  //////////////////////////////////////////////////////////// Public methods 
for remote block I/O
 
   /**
    * Respond to a block request by another executor.
@@ -472,6 +408,101 @@ public void onInputContext(final ByteInputContext 
inputContext) {
     throw new IllegalStateException("No logic here");
   }
 
+  //////////////////////////////////////////////////////////// Private helper 
methods
+
+  /**
+   * Retrieves data from the stored block. A specific hash value range can be 
designated.
+   *
+   * @param blockId    of the block.
+   * @param blockStore for the data storage.
+   * @param keyRange   the key range descriptor.
+   * @return the result data in the block.
+   */
+  private CompletableFuture<DataUtil.IteratorWithNumBytes> 
getDataFromLocalBlock(
+      final String blockId,
+      final InterTaskDataStoreProperty.Value blockStore,
+      final KeyRange keyRange) {
+    final BlockStore store = getBlockStore(blockStore);
+
+    // First, try to fetch the block from local BlockStore.
+    final Optional<Block> optionalBlock = store.readBlock(blockId);
+
+    if (optionalBlock.isPresent()) {
+      final Iterable<NonSerializedPartition> partitions = 
optionalBlock.get().readPartitions(keyRange);
+      handleUsedData(blockStore, blockId);
+
+      // Block resides in this evaluator!
+      try {
+        final Iterator innerIterator = 
DataUtil.concatNonSerPartitions(partitions).iterator();
+        long numSerializedBytes = 0;
+        long numEncodedBytes = 0;
+        try {
+          for (final NonSerializedPartition partition : partitions) {
+            numSerializedBytes += partition.getNumSerializedBytes();
+            numEncodedBytes += partition.getNumEncodedBytes();
+          }
+
+          return 
CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator,
 numSerializedBytes,
+              numEncodedBytes));
+        } catch (final 
DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+          return 
CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator));
+        }
+      } catch (final IOException e) {
+        throw new BlockFetchException(e);
+      }
+    } else {
+      // We don't have the block here...
+      throw new RuntimeException(String.format("Block %s not found in local 
BlockManagerWorker", blockId));
+    }
+  }
+
+
+  /**
+   * Handles used {@link edu.snu.nemo.runtime.executor.data.block.Block}.
+   *
+   * @param blockStore the store which contains the block.
+   * @param blockId    the ID of the block.
+   */
+  private void handleUsedData(final InterTaskDataStoreProperty.Value 
blockStore,
+                              final String blockId) {
+    final AtomicInteger remainingExpectedRead = 
blockToRemainingRead.get(blockId);
+    if (remainingExpectedRead != null) {
+      if (remainingExpectedRead.decrementAndGet() == 0) {
+        // This block should be discarded.
+        blockToRemainingRead.remove(blockId);
+        backgroundExecutorService.submit(new Runnable() {
+          @Override
+          public void run() {
+            removeBlock(blockId, blockStore);
+          }
+        });
+      }
+    } // If null, just keep the data in the store.
+  }
+
+  //////////////////////////////////////////////////////////// Converters
+
+  /**
+   * Gets the {@link BlockStore} from annotated value of {@link 
InterTaskDataStoreProperty}.
+   * @param blockStore the annotated value of {@link 
InterTaskDataStoreProperty}.
+   * @return the block store.
+   */
+  private BlockStore getBlockStore(final InterTaskDataStoreProperty.Value 
blockStore) {
+    switch (blockStore) {
+      case MemoryStore:
+        return memoryStore;
+      case SerializedMemoryStore:
+        return serializedMemoryStore;
+      case LocalFileStore:
+        return localFileStore;
+      case GlusterFileStore:
+        return remoteFileStore;
+      default:
+        throw new UnsupportedBlockStoreException(new Exception(blockStore + " 
is not supported."));
+    }
+  }
+
+
   /**
    * Decodes BlockStore property from protocol buffer.
    * @param blockStore property from protocol buffer
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottler.java
similarity index 86%
rename from 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
rename to 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottler.java
index 0870082cf..8d9f8a1ce 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottler.java
@@ -17,6 +17,8 @@
 
 import edu.snu.nemo.conf.JobConf;
 import org.apache.reef.tang.annotations.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.util.ArrayDeque;
@@ -30,13 +32,14 @@
  * Executors can suffer from performance degradation and network-related 
exceptions when there are massive connections,
  * especially under low network bandwidth or high volume of data.
  */
-public final class BlockTransferConnectionQueue {
+public final class BlockTransferThrottler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlockTransferThrottler.class.getName());
   private final Map<String, Integer> runtimeEdgeIdToNumCurrentConnections = 
new HashMap<>();
   private final Map<String, Queue<CompletableFuture<Void>>> 
runtimeEdgeIdToPendingConnections = new HashMap<>();
   private final int maxNum;
 
   @Inject
-  private 
BlockTransferConnectionQueue(@Parameter(JobConf.MaxNumDownloadsForARuntimeEdge.class)
 final int maxNum) {
+  private 
BlockTransferThrottler(@Parameter(JobConf.MaxNumDownloadsForARuntimeEdge.class) 
final int maxNum) {
     this.maxNum = maxNum;
   }
 
@@ -45,7 +48,7 @@ private 
BlockTransferConnectionQueue(@Parameter(JobConf.MaxNumDownloadsForARunti
    * @param runtimeEdgeId the corresponding runtime edge id.
    * @return a future that will be completed when the connection is granted.
    */
-  public synchronized CompletableFuture<Void> requestConnectPermission(final 
String runtimeEdgeId) {
+  public synchronized CompletableFuture<Void> requestTransferPermission(final 
String runtimeEdgeId) {
     runtimeEdgeIdToNumCurrentConnections.putIfAbsent(runtimeEdgeId, 0);
     runtimeEdgeIdToPendingConnections.computeIfAbsent(runtimeEdgeId, id -> new 
ArrayDeque<>());
     final int currentOutstandingConnections = 
runtimeEdgeIdToNumCurrentConnections.get(runtimeEdgeId);
@@ -63,10 +66,10 @@ private 
BlockTransferConnectionQueue(@Parameter(JobConf.MaxNumDownloadsForARunti
   }
 
   /**
-   * Indicates the connection has finished.
+   * Indicates the transfer has finished.
    * @param runtimeEdgeId the corresponding runtime edge id.
    */
-  public synchronized void onConnectionFinished(final String runtimeEdgeId) {
+  public synchronized void onTransferFinished(final String runtimeEdgeId) {
     final Queue<CompletableFuture<Void>> pendingConnections = 
runtimeEdgeIdToPendingConnections.get(runtimeEdgeId);
     if (pendingConnections.size() == 0) {
       // Just decrease the number of current connections.
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 2b29b5f43..e417c8268 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -31,6 +31,8 @@
 import edu.snu.nemo.runtime.common.data.HashRange;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
@@ -43,6 +45,7 @@
  * Represents the input data transfer to a task.
  */
 public final class InputReader extends DataTransfer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InputReader.class.getName());
   private final int dstTaskIndex;
   private final BlockManagerWorker blockManagerWorker;
 
@@ -89,7 +92,7 @@ public InputReader(final int dstTaskIndex,
     final String blockId = getBlockId(dstTaskIndex);
     final Optional<InterTaskDataStoreProperty.Value> dataStoreProperty
         = runtimeEdge.getPropertyValue(InterTaskDataStoreProperty.class);
-    return blockManagerWorker.queryBlock(blockId, getId(), 
dataStoreProperty.get(), HashRange.all());
+    return blockManagerWorker.readBlock(blockId, getId(), 
dataStoreProperty.get(), HashRange.all());
   }
 
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> 
readBroadcast() {
@@ -100,7 +103,7 @@ public InputReader(final int dstTaskIndex,
     final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new 
ArrayList<>();
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
       final String blockId = getBlockId(srcTaskIdx);
-      futures.add(blockManagerWorker.queryBlock(blockId, getId(), 
dataStoreProperty.get(), HashRange.all()));
+      futures.add(blockManagerWorker.readBlock(blockId, getId(), 
dataStoreProperty.get(), HashRange.all()));
     }
 
     return futures;
@@ -127,7 +130,7 @@ public InputReader(final int dstTaskIndex,
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
       final String blockId = getBlockId(srcTaskIdx);
       futures.add(
-          blockManagerWorker.queryBlock(blockId, getId(), 
dataStoreProperty.get(), hashRangeToRead));
+          blockManagerWorker.readBlock(blockId, getId(), 
dataStoreProperty.get(), hashRangeToRead));
     }
 
     return futures;
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 741bb300b..bfcd04096 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -325,7 +325,9 @@ private void finalizeVertex(final VertexHarness 
vertexHarness) {
   }
 
   private void handleMainOutputElement(final VertexHarness harness, final 
Object element) {
-    harness.getWritersToChildrenTasks().forEach(outputWriter -> 
outputWriter.write(element));
+    harness.getWritersToChildrenTasks().forEach(outputWriter -> {
+      outputWriter.write(element);
+    });
     if (harness.getSideInputChildren().size() > 0) {
       sideInputMap.put(((OperatorVertex) 
harness.getIRVertex()).getTransform().getTag(), element);
     }
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottlerTest.java
similarity index 73%
rename from 
runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
rename to 
runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottlerTest.java
index eeb003158..2c815b759 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueueTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTransferThrottlerTest.java
@@ -16,7 +16,6 @@
 package edu.snu.nemo.runtime.executor.data;
 
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.executor.data.BlockTransferConnectionQueue;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
@@ -29,22 +28,22 @@
 import java.util.concurrent.Future;
 import static org.junit.Assert.assertFalse;
 
-public final class BlockTransferConnectionQueueTest {
-  private static final String THREAD_NAME = 
BlockTransferConnectionQueue.class.getSimpleName() + "-TestThread";
+public final class BlockTransferThrottlerTest {
+  private static final String THREAD_NAME = 
BlockTransferThrottler.class.getSimpleName() + "-TestThread";
   private static final String RUNTIME_EDGE_0 = "RuntimeEdge0";
   private static final int WAIT_TIME = 1000;
   /**
-   * Creates {@link BlockTransferConnectionQueue} for testing.
+   * Creates {@link BlockTransferThrottler} for testing.
    * @param maxNum value for {@link JobConf.MaxNumDownloadsForARuntimeEdge} 
parameter.
-   * @return {@link BlockTransferConnectionQueue} object created.
+   * @return {@link BlockTransferThrottler} object created.
    */
-  private final BlockTransferConnectionQueue getQueue(final int maxNum) {
+  private final BlockTransferThrottler getQueue(final int maxNum) {
     final Configuration conf = Tang.Factory.getTang().newConfigurationBuilder()
         .bindNamedParameter(JobConf.MaxNumDownloadsForARuntimeEdge.class, 
String.valueOf(maxNum))
         .build();
     final Injector injector = Tang.Factory.getTang().newInjector(conf);
     try {
-      return injector.getInstance(BlockTransferConnectionQueue.class);
+      return injector.getInstance(BlockTransferThrottler.class);
     } catch (final InjectionException e) {
       throw new RuntimeException(e);
     }
@@ -54,13 +53,13 @@ private final BlockTransferConnectionQueue getQueue(final 
int maxNum) {
   public void test() throws InterruptedException, ExecutionException {
     final ExecutorService executorService = Executors.newSingleThreadExecutor(
         runnable -> new Thread(runnable, THREAD_NAME));
-    final BlockTransferConnectionQueue queue = getQueue(3);
+    final BlockTransferThrottler queue = getQueue(3);
     final Future executorServiceFuture = executorService.submit(() -> {
       try {
-        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
-        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
-        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
-        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
+        queue.requestTransferPermission(RUNTIME_EDGE_0).get();
+        queue.requestTransferPermission(RUNTIME_EDGE_0).get();
+        queue.requestTransferPermission(RUNTIME_EDGE_0).get();
+        queue.requestTransferPermission(RUNTIME_EDGE_0).get();
       } catch (final InterruptedException | ExecutionException e) {
         throw new RuntimeException(e);
       }
@@ -68,7 +67,7 @@ public void test() throws InterruptedException, 
ExecutionException {
     Thread.sleep(WAIT_TIME);
     // We must have one pending connection request.
     assertFalse(executorServiceFuture.isDone());
-    queue.onConnectionFinished(RUNTIME_EDGE_0);
+    queue.onTransferFinished(RUNTIME_EDGE_0);
     // The remaining request should be accepted before test timeout.
     executorServiceFuture.get();
   }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index da7c1ef6c..f07fe1b20 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -139,7 +139,6 @@ public void initializeState(final String blockId,
    */
   public Set<String> removeWorker(final String executorId) {
     final Set<String> tasksToRecompute = new HashSet<>();
-    LOG.warn("Worker {} is removed.", new Object[]{executorId});
 
     final Lock writeLock = lock.writeLock();
     writeLock.lock();
@@ -252,11 +251,10 @@ public void onProducerTaskFailed(final String 
failedTaskId) {
     writeLock.lock();
     try {
       if (producerTaskIdToBlockIds.containsKey(failedTaskId)) {
-        LOG.info("ProducerTask {} failed for a list of blocks:", failedTaskId);
         producerTaskIdToBlockIds.get(failedTaskId).forEach(blockId -> {
           final BlockState.State state = (BlockState.State)
               
blockIdToMetadata.get(blockId).getBlockState().getStateMachine().getCurrentState();
-          LOG.info("Partition lost: {}", blockId);
+          LOG.info("Block lost: {}", blockId);
           onBlockStateChanged(blockId, BlockState.State.NOT_AVAILABLE, null);
         });
       } // else this task does not produce any block
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
index c5957dbc5..9e6d471f7 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
@@ -61,23 +61,23 @@ synchronized void onStateChanged(final BlockState.State 
newState,
 
     switch (newState) {
       case IN_PROGRESS:
-        stateMachine.setState(newState);
         break;
       case NOT_AVAILABLE:
-        LOG.info("Block {} lost in {}", new Object[]{blockId, location});
         // Reset the block location and committer information.
         locationHandler.completeExceptionally(new 
AbsentBlockException(blockId, newState));
         locationHandler = new 
BlockManagerMaster.BlockLocationRequestHandler(blockId);
-        stateMachine.setState(newState);
         break;
       case AVAILABLE:
-        assert (location != null);
+        if (location == null) {
+          throw new RuntimeException("Null location");
+        }
         locationHandler.complete(location);
-        stateMachine.setState(newState);
         break;
       default:
         throw new UnsupportedOperationException(newState.toString());
     }
+
+    stateMachine.setState(newState);
   }
 
   /**
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index fd3e582ec..9ef2f8e87 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -249,7 +249,7 @@ private void onJobStateChanged(final JobState.State 
newState) {
     if (newState == JobState.State.EXECUTING) {
       LOG.debug("Executing Job ID {}...", this.jobId);
     } else if (newState == JobState.State.COMPLETE || newState == 
JobState.State.FAILED) {
-      LOG.info("Job ID {} {}!", new Object[]{jobId, newState});
+      LOG.debug("Job ID {} {}!", new Object[]{jobId, newState});
 
       // Awake all threads waiting the finish of this job.
       finishLock.lock();
@@ -354,7 +354,7 @@ public void storeJSON(final String directory, final String 
suffix) {
     file.getParentFile().mkdirs();
     try (final PrintWriter printWriter = new PrintWriter(file)) {
       printWriter.println(toStringWithPhysicalPlan());
-      LOG.info(String.format("JSON representation of job state for %s(%s) was 
saved to %s",
+      LOG.debug(String.format("JSON representation of job state for %s(%s) was 
saved to %s",
           jobId, suffix, file.getPath()));
     } catch (final IOException e) {
       LOG.warn(String.format("Cannot store JSON representation of job state 
for %s(%s) to %s: %s",
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 43e46ccf8..806f00011 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -108,7 +108,8 @@ public RuntimeMaster(final Scheduler scheduler,
     // since the processing logic in master takes a very short amount of time
     // compared to the job completion times of executed jobs
     // and keeping it single threaded removes the complexity of multi-thread 
synchronization.
-    this.runtimeMasterThread = Executors.newSingleThreadExecutor(runnable -> 
new Thread(runnable, "RuntimeMaster"));
+    this.runtimeMasterThread =
+        Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, 
"RuntimeMaster thread"));
     this.scheduler = scheduler;
     this.containerManager = containerManager;
     this.blockManagerMaster = blockManagerMaster;
@@ -218,13 +219,13 @@ public void requestContainer(final String 
resourceSpecificationString) {
 
         for (int i = 0; i < jsonRootNode.size(); i++) {
           final TreeNode resourceNode = jsonRootNode.get(i);
-          final ResourceSpecification.Builder builder = 
ResourceSpecification.newBuilder();
-          
builder.setContainerType(resourceNode.get("type").traverse().nextTextValue());
-          
builder.setMemory(resourceNode.get("memory_mb").traverse().getIntValue());
-          
builder.setCapacity(resourceNode.get("capacity").traverse().getIntValue());
+          final String type = 
resourceNode.get("type").traverse().nextTextValue();
+          final int memory = 
resourceNode.get("memory_mb").traverse().getIntValue();
+          final int capacity = 
resourceNode.get("capacity").traverse().getIntValue();
           final int executorNum = 
resourceNode.path("num").traverse().nextIntValue(1);
+          final int poisonSec = 
resourceNode.path("poison_sec").traverse().nextIntValue(-1);
           resourceRequestCount.getAndAdd(executorNum);
-          containerManager.requestContainer(executorNum, builder.build());
+          containerManager.requestContainer(executorNum, new 
ResourceSpecification(type, capacity, memory, poisonSec));
         }
         metricCountDownLatch = new CountDownLatch(resourceRequestCount.get());
       } catch (final Exception e) {
@@ -288,7 +289,6 @@ public boolean onExecutorLaunched(final ActiveContext 
activeContext) {
    */
   public void onExecutorFailed(final FailedEvaluator failedEvaluator) {
     runtimeMasterThread.execute(() -> {
-      LOG.info("onExecutorFailed: {}", failedEvaluator.getId());
       metricCountDownLatch.countDown();
 
       // Note that getFailedContextList() can be empty if the failure occurred
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
index 32d4bd9cf..a099a11ef 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
@@ -17,6 +17,7 @@
 
 import edu.snu.nemo.common.exception.ContainerException;
 import edu.snu.nemo.conf.JobConf;
+import edu.snu.nemo.runtime.common.message.FailedMessageSender;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -25,6 +26,8 @@
 import org.apache.reef.driver.evaluator.EvaluatorRequest;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
 import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +36,7 @@
 import javax.inject.Inject;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -146,7 +150,12 @@ public void onContainerAllocated(final String executorId,
         + ") allocated, will be used for [" + executorId + "]");
     pendingContextIdToResourceSpec.put(executorId, resourceSpecification);
 
-    allocatedContainer.submitContext(executorConfiguration);
+    // Poison handling
+    final Configuration poisonConfiguration = 
Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(JobConf.ExecutorPosionSec.class, 
String.valueOf(resourceSpecification.getPoisonSec()))
+        .build();
+
+    
allocatedContainer.submitContext(Configurations.merge(executorConfiguration, 
poisonConfiguration));
   }
 
   /**
@@ -165,16 +174,16 @@ public void onContainerAllocated(final String executorId,
 
     // We set contextId = executorId in NemoDriver when we generate executor 
configuration.
     final String executorId = activeContext.getId();
-
     final ResourceSpecification resourceSpec = 
pendingContextIdToResourceSpec.remove(executorId);
 
     // Connect to the executor and initiate Master side's executor 
representation.
-    final MessageSender messageSender;
+    MessageSender messageSender;
     try {
       messageSender =
           messageEnvironment.asyncConnect(executorId, 
MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID).get();
-    } catch (final Exception e) {
-      throw new RuntimeException(e);
+    } catch (final InterruptedException | ExecutionException e) {
+      // TODO #140: Properly classify and handle each RPC failure
+      messageSender = new FailedMessageSender();
     }
 
     // Create the executor representation.
@@ -182,9 +191,8 @@ public void onContainerAllocated(final String executorId,
         new ExecutorRepresenter(executorId, resourceSpec, messageSender, 
activeContext, serializationExecutorService,
             
activeContext.getEvaluatorDescriptor().getNodeDescriptor().getName());
 
-    LOG.info("{} is up and running at {}", executorId, 
executorRepresenter.getNodeName());
-
     
requestLatchByResourceSpecId.get(resourceSpec.getResourceSpecId()).countDown();
+
     return Optional.of(executorRepresenter);
   }
 
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index e43107403..e0df7662d 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -170,24 +170,6 @@ public int getExecutorCapacity() {
     return runningTasks;
   }
 
-  public Map<Task, Integer> getRunningTaskToAttempt() {
-    return runningTaskToAttempt;
-  }
-
-  /**
-   * @return set of ids of Tasks that have been failed in this exeuctor
-
-  public Set<String> getFailedTasks() {
-    return failedTasks;
-  }
-
-  /**
-   * @return set of ids of Tasks that have been completed in this executor
-   */
-  public Set<Task> getCompleteTasks() {
-    return completeTasks;
-  }
-
   /**
    * @return the executor id
    */
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
index 6a2b2b827..173a477a6 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
@@ -26,14 +26,23 @@
   private final String containerType;
   private final int capacity;
   private final int memory;
+  private final int poisonSec; // -1 if this resources is not poisoned
 
   public ResourceSpecification(final String containerType,
                                final int capacity,
                                final int memory) {
+    this(containerType, capacity, memory, -1);
+  }
+
+  public ResourceSpecification(final String containerType,
+                               final int capacity,
+                               final int memory,
+                               final int poisonSec) {
     this.resourceSpecId = RuntimeIdGenerator.generateResourceSpecId();
     this.containerType = containerType;
     this.capacity = capacity;
     this.memory = memory;
+    this.poisonSec = poisonSec;
   }
 
   /**
@@ -62,58 +71,10 @@ public String getResourceSpecId() {
   }
 
   /**
-   * @return {@link Builder} for {@link ResourceSpecification}.
+   * @return -1   if this resource is not poisoned. (for all other normal 
cases)
+   *         >= 0 the expected time to failure by poison. (for fault-handling 
tests)
    */
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  /**
-   * A Builder class for {@link ResourceSpecification}.
-   */
-  public static final class Builder {
-    private String containerType;
-    private Integer capacity;
-    private Integer memory;
-
-    private Builder() {
-    }
-
-    /**
-     * @param inputContainerType the container type
-     * @return {@link Builder} object.
-     */
-    public Builder setContainerType(final String inputContainerType) {
-      this.containerType = inputContainerType;
-      return this;
-    }
-
-    /**
-     * @param inputCapacity the number of Tasks that can be run in this 
container
-     * @return {@link Builder} object.
-     */
-    public Builder setCapacity(final int inputCapacity) {
-      this.capacity = inputCapacity;
-      return this;
-    }
-
-    /**
-     * @param inputMemory the size of the memory allocated, in megabytes
-     * @return {@link Builder} object.
-     */
-    public Builder setMemory(final int inputMemory) {
-      this.memory = inputMemory;
-      return this;
-    }
-
-    /**
-     * @return the {@link ResourceSpecification} object that has been built
-     */
-    public ResourceSpecification build() {
-      assert (containerType != null);
-      assert (capacity != null);
-      assert (memory != null);
-      return new ResourceSpecification(containerType, capacity, memory);
-    }
+  public int getPoisonSec() {
+    return poisonSec;
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 880c8d0e8..531fcedf4 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -192,7 +192,7 @@ public void onTaskStateReportFromExecutor(final String 
executorId,
           }
           break;
         case SHOULD_RETRY:
-          // Retry the failed task
+          // Do retry
           doSchedule();
           break;
         default:
@@ -224,13 +224,14 @@ public void onTaskStateReportFromExecutor(final String 
executorId,
 
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
-    LOG.info("{} added", executorRepresenter.getExecutorId());
+    LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), 
executorRepresenter.getNodeName());
     executorRegistry.registerExecutor(executorRepresenter);
     schedulerRunner.onExecutorSlotAvailable();
   }
 
   @Override
   public void onExecutorRemoved(final String executorId) {
+    LOG.info("{} removed", executorId);
     blockManagerMaster.removeWorker(executorId);
 
     // These are tasks that were running at the time of executor removal.
@@ -240,14 +241,8 @@ public void onExecutorRemoved(final String executorId) {
       return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
     });
 
-    // We need to retry the interrupted tasks, and also recover the tasks' 
missing input blocks if needed.
-    final Set<String> tasksToReExecute =
-        Sets.union(interruptedTasks, 
recursivelyGetParentTasksForLostBlocks(interruptedTasks));
-
-    // Report SHOULD_RETRY tasks so they can be re-scheduled
-    LOG.info("{} removed: {} will be retried", executorId, tasksToReExecute);
-    tasksToReExecute.forEach(
-        taskToReExecute -> jobStateManager.onTaskStateChanged(taskToReExecute, 
TaskState.State.SHOULD_RETRY));
+    // Retry the interrupted tasks (and required parents)
+    retryTasksAndRequiredParents(interruptedTasks);
 
     // Trigger the scheduling of SHOULD_RETRY tasks in the earliest 
scheduleGroup
     doSchedule();
@@ -263,8 +258,11 @@ public void terminate() {
 
   /**
    * The main entry point for task scheduling.
-   * This operation can be invoked at any point during job execution, as it is 
designed to be free of side-effects,
-   * and integrate well with {@link PendingTaskCollectionPointer} and {@link 
SchedulerRunner}.
+   * This operation can be invoked at any point during job execution, as it is 
designed to be free of side-effects.
+   *
+   * These are the reasons why.
+   * - We 'reset' {@link PendingTaskCollectionPointer}, and not 'add' new 
tasks to it
+   * - We make {@link SchedulerRunner} run only tasks that are READY.
    */
   private void doSchedule() {
     final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
@@ -275,8 +273,14 @@ private void doSchedule() {
           .flatMap(stage -> selectSchedulableTasks(stage).stream())
           .collect(Collectors.toList());
 
-      LOG.info("Attempting to schedule {} in the same ScheduleGroup",
-          
tasksToSchedule.stream().map(Task::getTaskId).collect(Collectors.toList()));
+      // We prefer (but not guarantee) to schedule the 'receiving' tasks first,
+      // assuming that tasks within a ScheduleGroup are connected with 'push' 
edges.
+      Collections.reverse(tasksToSchedule);
+
+      LOG.info("Scheduling some tasks in {}, which are in the same 
ScheduleGroup", tasksToSchedule.stream()
+          .map(Task::getTaskId)
+          .map(RuntimeIdGenerator::getStageIdFromTaskId)
+          .collect(Collectors.toSet()));
 
       // Set the pointer to the schedulable tasks.
       pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
@@ -289,6 +293,10 @@ private void doSchedule() {
   }
 
   private Optional<List<Stage>> selectEarliestSchedulableGroup() {
+    if (sortedScheduleGroups == null) {
+      return Optional.empty();
+    }
+
     return sortedScheduleGroups.stream()
         .filter(scheduleGroup -> scheduleGroup.stream()
             .map(Stage::getId)
@@ -431,10 +439,20 @@ private void onTaskExecutionFailedRecoverable(final 
String executorId,
       default:
         throw new UnknownFailureCauseException(new Throwable("Unknown cause: " 
+ failureCause));
     }
+
+    retryTasksAndRequiredParents(Collections.singleton(taskId));
   }
 
   ////////////////////////////////////////////////////////////////////// 
Helper methods
 
+  private void retryTasksAndRequiredParents(final Set<String> tasks) {
+    final Set<String> requiredParents = 
recursivelyGetParentTasksForLostBlocks(tasks);
+    final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
+    LOG.info("Will be retried: {}", tasksToRetry);
+    tasksToRetry.forEach(
+        taskToReExecute -> jobStateManager.onTaskStateChanged(taskToReExecute, 
TaskState.State.SHOULD_RETRY));
+  }
+
   private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> 
children) {
     if (children.isEmpty()) {
       return Collections.emptySet();
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index caf0d40a3..0b0524eb2 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -66,7 +66,8 @@ public SchedulerRunner(final SchedulingConstraintRegistry 
schedulingConstraintRe
                          final ExecutorRegistry executorRegistry) {
     this.jobStateManagers = new HashMap<>();
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
-    this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new 
Thread(runnable, "SchedulerRunner"));
+    this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
+        new Thread(runnable, "SchedulerRunner thread"));
     this.isSchedulerRunning = false;
     this.isTerminated = false;
     this.executorRegistry = executorRegistry;
@@ -114,7 +115,6 @@ void doScheduleTaskList() {
         continue;
       }
 
-      LOG.debug("Trying to schedule {}...", task.getTaskId());
       executorRegistry.viewExecutors(executors -> {
         final MutableObject<Set<ExecutorRepresenter>> candidateExecutors = new 
MutableObject<>(executors);
         task.getExecutionProperties().forEachProperties(property -> {
@@ -132,6 +132,8 @@ void doScheduleTaskList() {
           // update metadata first
           jobStateManager.onTaskStateChanged(task.getTaskId(), 
TaskState.State.EXECUTING);
 
+          LOG.info("{} scheduled to {}", task.getTaskId(), 
selectedExecutor.getExecutorId());
+
           // send the task
           selectedExecutor.onTaskScheduled(task);
         } else {
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
index 18dd6cfe7..441dd31d6 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
@@ -107,7 +107,7 @@ public void testRequestAllocateLaunch() {
         containerManager.onContainerAllocated(
             executorId,
             createMockEvaluator(evaluatorId, descriptor),
-            mock(Configuration.class));
+            createMockConfiguration());
         final ExecutorRepresenter executorRepresenter =
             containerManager.onContainerLaunched(createMockContext(executorId, 
descriptor)).get();
         assertEquals(spec.getContainerType(), 
executorRepresenter.getContainerType());
@@ -125,7 +125,7 @@ public void testFailureBeforeLaunch() {
     containerManager.onContainerAllocated(
         getExecutorId(),
         createMockEvaluator(evaluatorId, createDescriptor(RESOURCE_SPEC_A)),
-        mock(Configuration.class));
+        createMockConfiguration());
     assertEquals(RESOURCE_SPEC_A, 
containerManager.onContainerFailed(evaluatorId));
   }
 
@@ -139,7 +139,7 @@ public void testFailureAfterLaunch() {
     containerManager.onContainerAllocated(
         executorId,
         createMockEvaluator(evaluatorId, descriptor),
-        mock(Configuration.class));
+        createMockConfiguration());
     containerManager.onContainerLaunched(createMockContext(executorId, 
descriptor));
     assertEquals(RESOURCE_SPEC_A, 
containerManager.onContainerFailed(evaluatorId));
   }
@@ -170,4 +170,8 @@ private ActiveContext createMockContext(final String id,
     when(mockedContext.getEvaluatorDescriptor()).thenReturn(descriptor);
     return mockedContext;
   }
+
+  private Configuration createMockConfiguration() {
+    return Tang.Factory.getTang().newConfigurationBuilder().build();
+  }
 }
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
similarity index 99%
rename from 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index 7064a23ef..4b6af1a4a 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRestartTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -59,10 +59,10 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class, 
SchedulingConstraintRegistry.class,
     PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, 
MetricMessageHandler.class})
-public final class TaskRestartTest {
+public final class TaskRetryTest {
   @Rule public TestName testName = new TestName();
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TaskRestartTest.class.getName());
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskRetryTest.class.getName());
   private static final AtomicInteger ID_OFFSET = new AtomicInteger(1);
 
   private Random random;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to