seojangho closed pull request #49: [NEMO-87] Remove unused BlockStates
URL: https://github.com/apache/incubator-nemo/pull/49
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java
index f647445a..bc7e5e5e 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/exception/AbsentBlockException.java
@@ -18,7 +18,7 @@
 import edu.snu.nemo.runtime.common.state.BlockState;
 
 /**
- * An exception which represents the requested block is neither COMMITTED nor 
SCHEDULED.
+ * An exception which represents the requested block is neither AVAILABLE nor 
IN_PROGRESS.
  */
 public final class AbsentBlockException extends Exception {
   private final String blockId;
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
index 2402948a..65ada217 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
@@ -31,29 +31,22 @@ private StateMachine buildBlockStateMachine() {
     final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
 
     // Add states
-    stateMachineBuilder.addState(State.READY, "The block is ready to be 
created.");
-    stateMachineBuilder.addState(State.SCHEDULED, "The block is scheduled for 
creation.");
-    stateMachineBuilder.addState(State.COMMITTED, "The block has been 
committed.");
-    stateMachineBuilder.addState(State.LOST_BEFORE_COMMIT, "The task that 
produces the block is scheduled, "
-        + "but failed before committing");
-    stateMachineBuilder.addState(State.REMOVED, "The block has been removed 
(e.g., GC-ed).");
-    stateMachineBuilder.addState(State.LOST, "Block lost.");
+    stateMachineBuilder.addState(State.NOT_AVAILABLE, "The block is not 
available.");
+    stateMachineBuilder.addState(State.IN_PROGRESS, "The block is in the 
progress of being created.");
+    stateMachineBuilder.addState(State.AVAILABLE, "The block is available.");
 
     // Add transitions
-    stateMachineBuilder.addTransition(State.READY, State.SCHEDULED,
+    stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.IN_PROGRESS,
         "The task that produces the block is scheduled.");
-    stateMachineBuilder.addTransition(State.SCHEDULED, State.COMMITTED, 
"Successfully moved and committed");
-    stateMachineBuilder.addTransition(State.SCHEDULED, 
State.LOST_BEFORE_COMMIT, "The block is lost before commit");
-    stateMachineBuilder.addTransition(State.COMMITTED, State.LOST, "Lost after 
committed");
-    stateMachineBuilder.addTransition(State.COMMITTED, State.REMOVED, "Removed 
after committed");
-    stateMachineBuilder.addTransition(State.REMOVED, State.SCHEDULED,
-        "Re-scheduled after removal due to fault tolerance");
+    stateMachineBuilder.addTransition(State.IN_PROGRESS, State.AVAILABLE, "The 
block is successfully created");
 
-    stateMachineBuilder.addTransition(State.LOST, State.SCHEDULED, "The 
producer of the lost block is rescheduled");
-    stateMachineBuilder.addTransition(State.LOST_BEFORE_COMMIT, 
State.SCHEDULED,
-        "The producer of the lost block is rescheduled");
+    stateMachineBuilder.addTransition(State.IN_PROGRESS, State.NOT_AVAILABLE,
+        "The block is lost before being created");
+    stateMachineBuilder.addTransition(State.AVAILABLE, State.NOT_AVAILABLE, 
"The block is lost");
+    stateMachineBuilder.addTransition(State.NOT_AVAILABLE, State.NOT_AVAILABLE,
+        "A block can be reported lost from multiple sources");
 
-    stateMachineBuilder.setInitialState(State.READY);
+    stateMachineBuilder.setInitialState(State.NOT_AVAILABLE);
 
     return stateMachineBuilder.build();
   }
@@ -66,12 +59,9 @@ public StateMachine getStateMachine() {
    * BlockState.
    */
   public enum State {
-    READY,
-    SCHEDULED,
-    COMMITTED,
-    LOST_BEFORE_COMMIT,
-    LOST,
-    REMOVED
+    NOT_AVAILABLE,
+    IN_PROGRESS,
+    AVAILABLE,
   }
 
   @Override
diff --git a/runtime/common/src/main/proto/ControlMessage.proto 
b/runtime/common/src/main/proto/ControlMessage.proto
index cc883dc9..664734b1 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -141,12 +141,9 @@ enum TaskStateFromExecutor {
 }
 
 enum BlockStateFromExecutor {
-    BLOCK_READY = 0;
-    SCHEDULED = 1;
-    COMMITTED = 2;
-    LOST = 3;
-    LOST_BEFORE_COMMIT = 4;
-    REMOVED = 5;
+    NOT_AVAILABLE = 0;
+    IN_PROGRESS = 1;
+    AVAILABLE = 2;
 }
 
 enum BlockStore {
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index 41446c15..86a18cbb 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -287,7 +287,7 @@ public void writeBlock(final Block block,
         ControlMessage.BlockStateChangedMsg.newBuilder()
             .setExecutorId(executorId)
             .setBlockId(blockId)
-            .setState(ControlMessage.BlockStateFromExecutor.COMMITTED);
+            .setState(ControlMessage.BlockStateFromExecutor.AVAILABLE);
 
     if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
       blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
@@ -345,7 +345,7 @@ public void removeBlock(final String blockId,
           ControlMessage.BlockStateChangedMsg.newBuilder()
               .setExecutorId(executorId)
               .setBlockId(blockId)
-              .setState(ControlMessage.BlockStateFromExecutor.REMOVED);
+              .setState(ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE);
 
       if (DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
         blockStateChangedMsgBuilder.setLocation(REMOTE_FILE_STORE);
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index 48ee45ed..74c25048 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -129,7 +129,7 @@ public void setUp() throws Exception {
       blockIdList.add(blockId);
       blockManagerMaster.initializeState(blockId, "Unused");
       blockManagerMaster.onBlockStateChanged(
-          blockId, BlockState.State.SCHEDULED, null);
+          blockId, BlockState.State.IN_PROGRESS, null);
 
       // Create blocks for this block.
       final List<NonSerializedPartition<Integer>> partitionsForBlock = new 
ArrayList<>(NUM_READ_VERTICES);
@@ -150,7 +150,7 @@ public void setUp() throws Exception {
     concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, 
NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1);
     blockManagerMaster.initializeState(concBlockId, "unused");
     blockManagerMaster.onBlockStateChanged(
-        concBlockId, BlockState.State.SCHEDULED, null);
+        concBlockId, BlockState.State.IN_PROGRESS, null);
     IntStream.range(0, NUM_CONC_READ_TASKS).forEach(number -> 
concReadTaskIdList.add("conc_read_IR_vertex"));
     concBlockPartition = new NonSerializedPartition(0, getRangedNumList(0, 
CONC_READ_DATA_SIZE), -1, -1);
 
@@ -175,7 +175,7 @@ public void setUp() throws Exception {
       hashedBlockIdList.add(blockId);
       blockManagerMaster.initializeState(blockId, "Unused");
       blockManagerMaster.onBlockStateChanged(
-          blockId, BlockState.State.SCHEDULED, null);
+          blockId, BlockState.State.IN_PROGRESS, null);
       final List<NonSerializedPartition<Integer>> hashedBlock = new 
ArrayList<>(HASH_RANGE);
       // Generates the data having each hash value.
       IntStream.range(0, HASH_RANGE).forEach(hashValue ->
@@ -319,7 +319,7 @@ public Boolean call() {
               }
               block.commit();
               writerSideStore.writeBlock(block);
-              blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.COMMITTED,
+              blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.AVAILABLE,
                   "Writer side of the shuffle edge");
               return true;
             } catch (final Exception e) {
@@ -413,7 +413,7 @@ public Boolean call() {
           block.commit();
           writerSideStore.writeBlock(block);
           blockManagerMaster.onBlockStateChanged(
-              concBlockId, BlockState.State.COMMITTED, "Writer side of the 
concurrent read edge");
+              concBlockId, BlockState.State.AVAILABLE, "Writer side of the 
concurrent read edge");
           return true;
         } catch (final Exception e) {
           e.printStackTrace();
@@ -501,7 +501,7 @@ public Boolean call() {
               }
               block.commit();
               writerSideStore.writeBlock(block);
-              blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.COMMITTED,
+              blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.AVAILABLE,
                   "Writer side of the shuffle in hash range edge");
               return true;
             } catch (final Exception e) {
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 7b684a30..9c24671c 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -42,7 +42,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static edu.snu.nemo.runtime.common.state.BlockState.State.SCHEDULED;
+import static edu.snu.nemo.runtime.common.state.BlockState.State.IN_PROGRESS;
 
 /**
  * Master-side block manager.
@@ -109,7 +109,7 @@ public void initializeState(final String blockId,
     try {
       // Set committed block states to lost
       getCommittedBlocksByWorker(executorId).forEach(blockId -> {
-        onBlockStateChanged(blockId, BlockState.State.LOST, executorId);
+        onBlockStateChanged(blockId, BlockState.State.NOT_AVAILABLE, 
executorId);
         // producerTaskForPartition should always be non-empty.
         final Set<String> producerTaskForPartition = 
getProducerTaskIds(blockId);
         producerTaskForPartition.forEach(tasksToRecompute::add);
@@ -126,7 +126,7 @@ public void initializeState(final String blockId,
    *
    * @param blockId id of the specified block.
    * @return the handler of block location requests, which completes 
exceptionally when the block
-   * is not {@code SCHEDULED} or {@code COMMITTED}.
+   * is not {@code IN_PROGRESS} or {@code AVAILABLE}.
    */
   public BlockLocationRequestHandler getBlockLocationHandler(final String 
blockId) {
     final Lock readLock = lock.readLock();
@@ -135,13 +135,10 @@ public BlockLocationRequestHandler 
getBlockLocationHandler(final String blockId)
       final BlockState.State state =
           (BlockState.State) 
getBlockState(blockId).getStateMachine().getCurrentState();
       switch (state) {
-        case SCHEDULED:
-        case COMMITTED:
+        case IN_PROGRESS:
+        case AVAILABLE:
           return blockIdToMetadata.get(blockId).getLocationHandler();
-        case READY:
-        case LOST_BEFORE_COMMIT:
-        case LOST:
-        case REMOVED:
+        case NOT_AVAILABLE:
           final BlockLocationRequestHandler handler = new 
BlockLocationRequestHandler(blockId);
           handler.completeExceptionally(new AbsentBlockException(blockId, 
state));
           return handler;
@@ -191,8 +188,8 @@ public void onProducerTaskScheduled(final String 
scheduledTaskId) {
       if (producerTaskIdToBlockIds.containsKey(scheduledTaskId)) {
         producerTaskIdToBlockIds.get(scheduledTaskId).forEach(blockId -> {
           if (!blockIdToMetadata.get(blockId).getBlockState()
-              .getStateMachine().getCurrentState().equals(SCHEDULED)) {
-            onBlockStateChanged(blockId, SCHEDULED, null);
+              .getStateMachine().getCurrentState().equals(IN_PROGRESS)) {
+            onBlockStateChanged(blockId, IN_PROGRESS, null);
           }
         });
       } // else this task does not produce any block
@@ -216,13 +213,8 @@ public void onProducerTaskFailed(final String 
failedTaskId) {
         producerTaskIdToBlockIds.get(failedTaskId).forEach(blockId -> {
           final BlockState.State state = (BlockState.State)
               
blockIdToMetadata.get(blockId).getBlockState().getStateMachine().getCurrentState();
-          if (state == BlockState.State.COMMITTED) {
-            LOG.info("Partition lost: {}", blockId);
-            onBlockStateChanged(blockId, BlockState.State.LOST, null);
-          } else {
-            LOG.info("Partition lost_before_commit: {}", blockId);
-            onBlockStateChanged(blockId, BlockState.State.LOST_BEFORE_COMMIT, 
null);
-          }
+          LOG.info("Partition lost: {}", blockId);
+          onBlockStateChanged(blockId, BlockState.State.NOT_AVAILABLE, null);
         });
       } // else this task does not produce any block
     } finally {
@@ -444,18 +436,12 @@ void registerRequest(final long requestId,
    */
   public static BlockState.State convertBlockState(final 
ControlMessage.BlockStateFromExecutor state) {
     switch (state) {
-      case BLOCK_READY:
-        return BlockState.State.READY;
-      case SCHEDULED:
-        return BlockState.State.SCHEDULED;
-      case COMMITTED:
-        return BlockState.State.COMMITTED;
-      case LOST_BEFORE_COMMIT:
-        return BlockState.State.LOST_BEFORE_COMMIT;
-      case LOST:
-        return BlockState.State.LOST;
-      case REMOVED:
-        return BlockState.State.REMOVED;
+      case NOT_AVAILABLE:
+        return BlockState.State.NOT_AVAILABLE;
+      case IN_PROGRESS:
+        return BlockState.State.IN_PROGRESS;
+      case AVAILABLE:
+        return BlockState.State.AVAILABLE;
       default:
         throw new UnknownExecutionStateException(new Exception("This 
BlockState is unknown: " + state));
     }
@@ -468,18 +454,12 @@ void registerRequest(final long requestId,
    */
   public static ControlMessage.BlockStateFromExecutor convertBlockState(final 
BlockState.State state) {
     switch (state) {
-      case READY:
-        return ControlMessage.BlockStateFromExecutor.BLOCK_READY;
-      case SCHEDULED:
-        return ControlMessage.BlockStateFromExecutor.SCHEDULED;
-      case COMMITTED:
-        return ControlMessage.BlockStateFromExecutor.COMMITTED;
-      case LOST_BEFORE_COMMIT:
-        return ControlMessage.BlockStateFromExecutor.LOST_BEFORE_COMMIT;
-      case LOST:
-        return ControlMessage.BlockStateFromExecutor.LOST;
-      case REMOVED:
-        return ControlMessage.BlockStateFromExecutor.REMOVED;
+      case NOT_AVAILABLE:
+        return ControlMessage.BlockStateFromExecutor.NOT_AVAILABLE;
+      case IN_PROGRESS:
+        return ControlMessage.BlockStateFromExecutor.IN_PROGRESS;
+      case AVAILABLE:
+        return ControlMessage.BlockStateFromExecutor.AVAILABLE;
       default:
         throw new UnknownExecutionStateException(new Exception("This 
BlockState is unknown: " + state));
     }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
index c2fc732c..c5957dbc 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockMetadata.java
@@ -60,19 +60,17 @@ synchronized void onStateChanged(final BlockState.State 
newState,
     LOG.debug("Block State Transition: id {} from {} to {}", new 
Object[]{blockId, oldState, newState});
 
     switch (newState) {
-      case SCHEDULED:
+      case IN_PROGRESS:
         stateMachine.setState(newState);
         break;
-      case LOST:
+      case NOT_AVAILABLE:
         LOG.info("Block {} lost in {}", new Object[]{blockId, location});
-      case LOST_BEFORE_COMMIT:
-      case REMOVED:
         // Reset the block location and committer information.
         locationHandler.completeExceptionally(new 
AbsentBlockException(blockId, newState));
         locationHandler = new 
BlockManagerMaster.BlockLocationRequestHandler(blockId);
         stateMachine.setState(newState);
         break;
-      case COMMITTED:
+      case AVAILABLE:
         assert (location != null);
         locationHandler.complete(location);
         stateMachine.setState(newState);
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index 4a0733d2..a465374b 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -21,7 +21,6 @@
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.state.BlockState;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.junit.Before;
@@ -88,25 +87,25 @@ public void testLostAfterCommit() throws Exception {
     final String executorId = RuntimeIdGenerator.generateExecutorId();
     final String blockId = RuntimeIdGenerator.generateBlockId(edgeId, 
srcTaskIndex);
 
-    // Initially the block state is READY.
+    // Initially the block state is NOT_AVAILABLE.
     blockManagerMaster.initializeState(blockId, taskId);
     
checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(),
 blockId,
-        BlockState.State.READY);
+        BlockState.State.NOT_AVAILABLE);
 
-    // The block is being SCHEDULED.
+    // The block is being IN_PROGRESS.
     blockManagerMaster.onProducerTaskScheduled(taskId);
     final Future<String> future = 
blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture();
     checkPendingFuture(future);
 
-    // The block is COMMITTED
-    blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.COMMITTED, executorId);
-    checkBlockLocation(future, executorId); // A future, previously pending on 
SCHEDULED state, is now resolved.
+    // The block is AVAILABLE
+    blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.AVAILABLE, executorId);
+    checkBlockLocation(future, executorId); // A future, previously pending on 
IN_PROGRESS state, is now resolved.
     
checkBlockLocation(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(),
 executorId);
 
-    // We LOST the block.
+    // We lost the block.
     blockManagerMaster.removeWorker(executorId);
     
checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(),
 blockId,
-        BlockState.State.LOST);
+        BlockState.State.NOT_AVAILABLE);
   }
 
   /**
@@ -130,10 +129,10 @@ public void testBeforeAfterCommit() throws Exception {
     // Producer task fails.
     blockManagerMaster.onProducerTaskFailed(taskId);
 
-    // A future, previously pending on SCHEDULED state, is now completed 
exceptionally.
-    checkBlockAbsentException(future0, blockId, 
BlockState.State.LOST_BEFORE_COMMIT);
+    // A future, previously pending on IN_PROGRESS state, is now completed 
exceptionally.
+    checkBlockAbsentException(future0, blockId, 
BlockState.State.NOT_AVAILABLE);
     
checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(),
 blockId,
-        BlockState.State.LOST_BEFORE_COMMIT);
+        BlockState.State.NOT_AVAILABLE);
 
     // Re-scheduling the task.
     blockManagerMaster.onProducerTaskScheduled(taskId);
@@ -141,13 +140,13 @@ public void testBeforeAfterCommit() throws Exception {
     checkPendingFuture(future1);
 
     // Committed.
-    blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.COMMITTED, executorId);
-    checkBlockLocation(future1, executorId); // A future, previously pending 
on SCHEDULED state, is now resolved.
+    blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.AVAILABLE, executorId);
+    checkBlockLocation(future1, executorId); // A future, previously pending 
on IN_PROGRESS state, is now resolved.
     
checkBlockLocation(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(),
 executorId);
 
     // Then removed.
-    blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.REMOVED, 
executorId);
+    blockManagerMaster.onBlockStateChanged(blockId, 
BlockState.State.NOT_AVAILABLE, executorId);
     
checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(),
 blockId,
-        BlockState.State.REMOVED);
+        BlockState.State.NOT_AVAILABLE);
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to