Myriad State Store Implementation
* Added a new interface called MyriadStateStore that every Myriad state store 
implementation should implement.
* Added a new class MyriadFileSystemRMStateStore that extends YARNs DFS based 
FileSystemRMStateStore implementation.
  This class also implements the MyriadStateStore interface.
* Added RMContext Object to the initialization code.
* We store the Myriad frameworkId that we receive during initial registration.
* We also store the state for all NM Tasks (Tasks against which NodeManagers 
are launched by the Mesos scheduler), their TaskIds, and the state in which 
these tasks are.
* When Yarn initializes, the starts the state store service starts first. It 
loads Yarn and Myriad state from the state store and then starts other services 
like the Yarn scheduler (which initializes Myriad scheduler). See
https://github.com/apache/hadoop/blob/branch-2.7.0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java#L560


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/23e01ecd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/23e01ecd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/23e01ecd

Branch: refs/heads/master
Commit: 23e01ecd2106b48a3c0db87201aba81cfd61dd29
Parents: d1eca5c
Author: Swapnil Daingade <sdaing...@maprtech.com>
Authored: Sat Aug 1 13:18:19 2015 -0700
Committer: Swapnil Daingade <sdaing...@maprtech.com>
Committed: Sat Aug 29 11:41:33 2015 -0700

----------------------------------------------------------------------
 myriad-scheduler/build.gradle                   |   9 +-
 .../src/main/java/com/ebay/myriad/Main.java     |   2 +-
 .../main/java/com/ebay/myriad/MesosModule.java  |  16 +--
 .../main/java/com/ebay/myriad/MyriadModule.java |  10 +-
 .../event/handlers/RegisteredEventHandler.java  |   2 +-
 .../scheduler/yarn/MyriadCapacityScheduler.java |   9 +-
 .../scheduler/yarn/MyriadFairScheduler.java     |   9 +-
 .../scheduler/yarn/MyriadFifoScheduler.java     |   9 +-
 .../yarn/interceptor/BaseInterceptor.java       |   2 +-
 .../yarn/interceptor/CompositeInterceptor.java  |   1 +
 .../MyriadInitializationInterceptor.java        |   1 +
 .../com/ebay/myriad/state/MyriadStateStore.java |  30 ++++++
 .../com/ebay/myriad/state/SchedulerState.java   |  87 ++++++++++++++--
 .../recovery/MyriadFileSystemRMStateStore.java  | 104 +++++++++++++++++++
 14 files changed, 252 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/build.gradle
----------------------------------------------------------------------
diff --git a/myriad-scheduler/build.gradle b/myriad-scheduler/build.gradle
index b3ee397..aad71a1 100644
--- a/myriad-scheduler/build.gradle
+++ b/myriad-scheduler/build.gradle
@@ -10,10 +10,11 @@ dependencies {
     compile "io.dropwizard.metrics:metrics-annotation:${metricsVer}"
     compile "io.dropwizard.metrics:metrics-healthchecks:${metricsVer}"
     compile "org.hibernate:hibernate-validator:5.1.2.Final"
-       compile "com.fasterxml.jackson.core:jackson-annotations:2.5.1"
-       compile "com.fasterxml.jackson.core:jackson-databind:2.5.1"
-       compile "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.5.1"
-       compile "org.apache.curator:curator-framework:2.7.1"
+    compile "com.fasterxml.jackson.core:jackson-annotations:2.5.1"
+    compile "com.fasterxml.jackson.core:jackson-databind:2.5.1"
+    compile "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.5.1"
+    compile "org.apache.curator:curator-framework:2.7.1"
+    compile "org.apache.commons:commons-lang3:3.4"
     testCompile 
"org.apache.hadoop:hadoop-yarn-server-resourcemanager:${hadoopVer}:tests"
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
index 64eb3be..9871d58 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
@@ -220,4 +220,4 @@ public class Main {
         disruptorManager.init(injector);
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java
index e4b63d6..77b89df 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java
@@ -18,7 +18,6 @@ package com.ebay.myriad;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
@@ -42,7 +41,6 @@ import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * Guice Module for Mesos objects.
@@ -75,16 +73,10 @@ public class MesosModule extends AbstractModule {
       frameworkInfoBuilder.setRole(cfg.getFrameworkRole());
     }
 
-    FrameworkID frameworkId;
-    try {
-      frameworkId = schedulerState.getMyriadState().getFrameworkID();
-      if (frameworkId != null) {
-        LOGGER.info("Attempting to re-register with frameworkId: {}", 
frameworkId.getValue());
-        frameworkInfoBuilder.setId(frameworkId);
-      }
-    } catch (InterruptedException | ExecutionException | 
InvalidProtocolBufferException e) {
-      LOGGER.error("Error fetching frameworkId", e);
-      throw new RuntimeException(e);
+    FrameworkID frameworkId = schedulerState.getFrameworkID();
+    if (frameworkId != null) {
+      LOGGER.info("Attempting to re-register with frameworkId: {}", 
frameworkId.getValue());
+      frameworkInfoBuilder.setId(frameworkId);
     }
 
     String mesosAuthenticationPrincipal = 
cfg.getMesosAuthenticationPrincipal();

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
index 9532604..b73754e 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java
@@ -29,7 +29,6 @@ import com.ebay.myriad.scheduler.TaskFactory;
 import com.ebay.myriad.scheduler.TaskFactory.NMTaskFactoryImpl;
 import com.ebay.myriad.scheduler.fgs.YarnNodeCapacityManager;
 import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
-import com.ebay.myriad.state.MyriadState;
 import com.ebay.myriad.state.SchedulerState;
 import com.ebay.myriad.webapp.HttpConnectorProvider;
 import com.google.inject.AbstractModule;
@@ -40,11 +39,9 @@ import com.google.inject.Singleton;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.mesos.state.State;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Guice Module for Myriad
  */
@@ -95,11 +92,8 @@ public class MyriadModule extends AbstractModule {
 
     @Provides
     @Singleton
-    SchedulerState providesSchedulerState(MyriadConfiguration cfg,
-        State stateStore) {
-
+    SchedulerState providesSchedulerState(MyriadConfiguration cfg) {
         LOGGER.debug("Configuring SchedulerState provider");
-        MyriadState state = new MyriadState(stateStore);
-        return new SchedulerState(state);
+        return new SchedulerState(rmContext.getStateStore());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/RegisteredEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/RegisteredEventHandler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/RegisteredEventHandler.java
index 6f9b84b..06ea7ca 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/RegisteredEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/RegisteredEventHandler.java
@@ -39,7 +39,7 @@ public class RegisteredEventHandler implements 
EventHandler<RegisteredEvent> {
     @Override
     public void onEvent(RegisteredEvent event, long sequence, boolean 
endOfBatch) throws Exception {
         LOGGER.info("Received event: {} with frameworkId: {}", event, 
event.getFrameworkId());
-        schedulerState.getMyriadState().setFrameworkId(event.getFrameworkId());
+        schedulerState.setFrameworkId(event.getFrameworkId());
         reconcileService.reconcile(event.getDriver());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java
index 8355ab3..c107ee3 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java
@@ -15,6 +15,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
  * via the {@link YarnSchedulerInterceptor} interface.
  */
 public class MyriadCapacityScheduler extends CapacityScheduler {
+  private Configuration conf;
 
   private RMContext rmContext;
   private YarnSchedulerInterceptor yarnSchedulerInterceptor;
@@ -47,11 +48,17 @@ public class MyriadCapacityScheduler extends 
CapacityScheduler {
 
   @Override
   public synchronized void serviceInit(Configuration conf) throws Exception {
-    this.yarnSchedulerInterceptor.init(conf, this, rmContext);
+    this.conf = conf;
     super.serviceInit(conf);
   }
 
   @Override
+  public synchronized void serviceStart() throws Exception {
+    this.yarnSchedulerInterceptor.init(conf, this, rmContext);
+    super.serviceStart();
+  }
+
+  @Override
   public synchronized void handle(SchedulerEvent event) {
     this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event);
     super.handle(event);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java
index 9d5363e..65ef880 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java
@@ -19,6 +19,7 @@ public class MyriadFairScheduler extends FairScheduler {
     private RMContext rmContext;
     private YarnSchedulerInterceptor yarnSchedulerInterceptor;
     private RMNodeEventHandler rmNodeEventHandler;
+    private Configuration conf;
 
     public MyriadFairScheduler() {
         super();
@@ -47,11 +48,17 @@ public class MyriadFairScheduler extends FairScheduler {
 
     @Override
     public synchronized void serviceInit(Configuration conf) throws Exception {
-        this.yarnSchedulerInterceptor.init(conf, this, rmContext);
+        this.conf = conf;
         super.serviceInit(conf);
     }
 
     @Override
+    public synchronized void serviceStart() throws Exception {
+        this.yarnSchedulerInterceptor.init(conf, this, rmContext);
+        super.serviceStart();
+    }
+
+    @Override
     public synchronized void handle(SchedulerEvent event) {
         this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event);
         super.handle(event);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java
index f03b7ca..dfe851a 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java
@@ -15,6 +15,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoSchedule
  * via the {@link YarnSchedulerInterceptor} interface.
  */
 public class MyriadFifoScheduler extends FifoScheduler {
+  private Configuration conf;
 
   private RMContext rmContext;
   private YarnSchedulerInterceptor yarnSchedulerInterceptor;
@@ -47,11 +48,17 @@ public class MyriadFifoScheduler extends FifoScheduler {
 
   @Override
   public synchronized void serviceInit(Configuration conf) throws Exception {
-    this.yarnSchedulerInterceptor.init(conf, this, rmContext);
+    this.conf = conf;
     super.serviceInit(conf);
   }
 
   @Override
+  public synchronized void serviceStart() throws Exception {
+    this.yarnSchedulerInterceptor.init(conf, this, rmContext);
+    super.serviceStart();
+  }
+
+  @Override
   public synchronized void handle(SchedulerEvent event) {
     this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event);
     super.handle(event);

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
index face1bf..182bc2d 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/BaseInterceptor.java
@@ -29,7 +29,7 @@ public class BaseInterceptor implements 
YarnSchedulerInterceptor {
     };
   }
 
-  @Override
+    @Override
     public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, 
RMContext rmContext) throws IOException {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
index 350aaa2..2cfd365 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/CompositeInterceptor.java
@@ -12,6 +12,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
index 4e071c5..3736e4a 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java
@@ -4,6 +4,7 @@ import com.ebay.myriad.Main;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java
new file mode 100644
index 0000000..533eaed
--- /dev/null
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/MyriadStateStore.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ebay.myriad.state;
+
+/**
+ * Interface implemented by all Myriad State Store implementations 
+ */
+public interface MyriadStateStore {
+
+  byte[] loadMyriadState() throws Exception;
+
+  void storeMyriadState(byte[] myriadState) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
index ed172b9..75503b6 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
@@ -15,11 +15,6 @@
  */
 package com.ebay.myriad.state;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.mesos.Protos;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -30,6 +25,15 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.collections.CollectionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.mesos.Protos;
+import com.ebay.myriad.state.utils.StoreContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+
 /**
  * Represents the state of the Myriad scheduler
  */
@@ -43,15 +47,18 @@ public class SchedulerState {
     private Set<Protos.TaskID> lostTasks;
     private Set<Protos.TaskID> killableTasks;
     private MyriadState myriadState;
+    private Protos.FrameworkID frameworkId;
+    private RMStateStore stateStore;
 
-    public SchedulerState(MyriadState myriadState) {
+    public SchedulerState(RMStateStore stateStore) {
         this.tasks = new ConcurrentHashMap<>();
         this.pendingTasks = new HashSet<>();
         this.stagingTasks = new HashSet<>();
         this.activeTasks = new HashSet<>();
         this.lostTasks = new HashSet<>();
         this.killableTasks = new HashSet<>();
-        this.myriadState = myriadState;
+        this.stateStore = stateStore;
+        loadStateStore();
     }
 
     public void addNodes(Collection<NodeTask> nodes) {
@@ -66,9 +73,10 @@ public class SchedulerState {
             LOGGER.info("Marked taskId {} pending, size of pending queue {}", 
taskId.getValue(), this.pendingTasks.size());
             makeTaskPending(taskId);
         }
+
     }
 
-    public void addTask(Protos.TaskID taskId, NodeTask node) {
+    private void addTask(Protos.TaskID taskId, NodeTask node) {
         this.tasks.put(taskId, node);
     }
 
@@ -78,6 +86,7 @@ public class SchedulerState {
         if (this.tasks.containsKey(taskId)) {
             this.tasks.get(taskId).setTaskStatus(taskStatus);
         }
+        updateStateStore();
     }
 
     public void makeTaskPending(Protos.TaskID taskId) {
@@ -89,6 +98,7 @@ public class SchedulerState {
         activeTasks.remove(taskId);
         lostTasks.remove(taskId);
         killableTasks.remove(taskId);
+        updateStateStore();
     }
 
     public void makeTaskStaging(Protos.TaskID taskId) {
@@ -100,6 +110,7 @@ public class SchedulerState {
         activeTasks.remove(taskId);
         lostTasks.remove(taskId);
         killableTasks.remove(taskId);
+        updateStateStore();
     }
 
     public void makeTaskActive(Protos.TaskID taskId) {
@@ -111,6 +122,7 @@ public class SchedulerState {
         activeTasks.add(taskId);
         lostTasks.remove(taskId);
         killableTasks.remove(taskId);
+        updateStateStore();
     }
 
     public void makeTaskLost(Protos.TaskID taskId) {
@@ -122,6 +134,7 @@ public class SchedulerState {
         activeTasks.remove(taskId);
         lostTasks.add(taskId);
         killableTasks.remove(taskId);
+        updateStateStore();
     }
 
     public void makeTaskKillable(Protos.TaskID taskId) {
@@ -133,6 +146,7 @@ public class SchedulerState {
         activeTasks.remove(taskId);
         lostTasks.remove(taskId);
         killableTasks.add(taskId);
+        updateStateStore();
     }
 
     public Set<Protos.TaskID> getKillableTasks() {
@@ -150,6 +164,7 @@ public class SchedulerState {
         this.lostTasks.remove(taskId);
         this.killableTasks.remove(taskId);
         this.tasks.remove(taskId);
+        updateStateStore();
     }
 
     public Set<Protos.TaskID> getPendingTaskIds() {
@@ -201,4 +216,58 @@ public class SchedulerState {
     public boolean hasTask(Protos.TaskID taskID) {
         return this.tasks.containsKey(taskID);
     }
-}
\ No newline at end of file
+
+    public Protos.FrameworkID getFrameworkID() {
+         return this.frameworkId;
+    }
+
+    public void setFrameworkId(Protos.FrameworkID newFrameworkId) {
+        this.frameworkId = newFrameworkId;
+        updateStateStore();
+    }
+
+    private void updateStateStore() {
+        if (!isMyriadStateStore()) {
+            return;
+        }
+        try {
+            StoreContext sc = new StoreContext(frameworkId, tasks, 
pendingTasks,
+                stagingTasks, activeTasks, lostTasks, killableTasks);
+            ((MyriadStateStore) stateStore).storeMyriadState(
+                sc.toSerializedContext().toByteArray());
+            LOGGER.info("Scheduler state stored to state store");
+        } catch (Exception e) {
+            LOGGER.error("Failed to write scheduler state to state store", e);
+        }
+    }
+
+    private void loadStateStore() {
+        if (!isMyriadStateStore()) {
+            return;
+        }
+        try {
+            byte[] stateStoreBytes = ((MyriadStateStore) 
stateStore).loadMyriadState();
+            if (stateStoreBytes != null && stateStoreBytes.length > 0) {
+                StoreContext sc = 
StoreContext.fromSerializedBytes(stateStoreBytes);
+                this.frameworkId = sc.getFrameworkId();
+                this.tasks.putAll(sc.getTasks());
+                this.pendingTasks.addAll(sc.getPendingTasks());
+                this.stagingTasks.addAll(sc.getStagingTasks());
+                this.activeTasks.addAll(sc.getActiveTasks());
+                this.lostTasks.addAll(sc.getLostTasks());
+                this.killableTasks.addAll(sc.getKillableTasks());
+            }
+        }  catch (Exception e) {
+            LOGGER.error("Failed to read scheduler state from state store", e);
+        }
+   }
+
+    private boolean isMyriadStateStore() {
+        if (!(stateStore instanceof MyriadStateStore)) {
+            LOGGER.error("State store is not an instance of " +
+                MyriadStateStore.class.getName() + ". Cannot load/store 
Scheduler state.");
+            return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/23e01ecd/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
 
b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
new file mode 100644
index 0000000..45bfcfd
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ebay.myriad.state.MyriadStateStore;
+
+/**
+ * StateStore that stores Myriad state in addition to RM state to DFS. 
+ */
+public class MyriadFileSystemRMStateStore extends FileSystemRMStateStore
+  implements MyriadStateStore{
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(MyriadFileSystemRMStateStore.class);
+
+  protected static final String ROOT_NAME = "FSRMStateRoot";
+  private static final String RM_MYRIAD_ROOT = "RMMyriadRoot";
+  private static final String MYRIAD_STATE_FILE = "MyriadState";
+
+  private Path myriadPathRoot = null;
+  private byte[] myriadState = null; 
+
+  @Override
+  public synchronized void initInternal(Configuration conf) throws Exception{
+    super.initInternal(conf);
+    Path rootPath = new Path(fsWorkingPath, ROOT_NAME);
+    myriadPathRoot = new Path(rootPath, RM_MYRIAD_ROOT);
+  }
+
+  @Override
+  protected synchronized void startInternal() throws Exception {
+    super.startInternal();
+    fs.mkdirs(myriadPathRoot);
+  }
+
+  @Override
+  public synchronized RMState loadState() throws Exception {
+    RMState rmState = super.loadState();
+
+    Path myriadStatePath = new Path(myriadPathRoot, MYRIAD_STATE_FILE);
+    LOGGER.info("Loading state information for Myriad from: " + 
myriadStatePath);
+
+    try {
+      // Throws IOException if file is not present.
+      FileStatus fileStatus = fs.listStatus(myriadStatePath)[0];
+      FSDataInputStream in = fs.open(myriadStatePath);        
+      myriadState = new byte[(int) fileStatus.getLen()];
+      in.readFully(myriadState);
+      in.close();
+    } catch (IOException e) {
+      LOGGER.error("State information for Myriad could not be loaded from: "
+        + myriadStatePath, e);
+    }
+    return rmState;
+  }
+
+  @Override
+  public synchronized byte[] loadMyriadState() throws Exception {
+    byte[] ms = null;
+    if (myriadState != null) {
+      ms = myriadState.clone();
+      myriadState = null;
+    }
+    return ms;
+  }
+
+  @Override
+  public synchronized void storeMyriadState(byte[] myriadState) throws 
Exception{
+    Path myriadStatePath = new Path(myriadPathRoot, MYRIAD_STATE_FILE);
+
+    LOGGER.info("Storing state informatio for Myriad at: " + myriadStatePath);
+    try {
+      updateFile(myriadStatePath, myriadState);
+    } catch (Exception e) {
+        LOGGER.error("State information for Myriad could not be stored at: "
+                + myriadStatePath, e);
+    }
+  }
+}

Reply via email to