Scheduler state serialization and deserialization code contributed by Paul Read
StoreContext : Converts SchedulerState to byte[] and back.
ByteBufferSupport : Helper class that adds serialized objects to the bytestream 
and back

sdaingade: Made changes to add frameworkId


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/d1eca5c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/d1eca5c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/d1eca5c9

Branch: refs/heads/master
Commit: d1eca5c984cb185e9d792faba03a40a88e6cc4b4
Parents: 1a2f8a0
Author: Paul Read <pdread...@gmail.com>
Authored: Sat Aug 1 11:19:41 2015 -0700
Committer: Swapnil Daingade <sdaing...@maprtech.com>
Committed: Sat Aug 29 11:41:33 2015 -0700

----------------------------------------------------------------------
 .../myriad/state/utils/ByteBufferSupport.java   | 298 +++++++++++++++++++
 .../ebay/myriad/state/utils/StoreContext.java   | 273 +++++++++++++++++
 2 files changed, 571 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d1eca5c9/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
new file mode 100644
index 0000000..e1081f0
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ebay.myriad.state.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.mesos.Protos;
+import com.ebay.myriad.scheduler.NMProfile;
+import com.ebay.myriad.state.NodeTask;
+import com.google.gson.Gson;
+import com.google.protobuf.GeneratedMessage;
+
+/**
+* ByteBuffer support for the Serialization of the StoreContext
+*/
+public class ByteBufferSupport {
+
+  public static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
+  public static final String UTF8 = "UTF-8";
+  public static final byte[] ZERO_BYTES = new byte[0];
+  private static Gson gson = new Gson();
+
+  public static void addByteBuffers(List<ByteBuffer> list,
+    ByteArrayOutputStream bytes) throws IOException {
+    // If list, add the list size, then the size of each buffer followed by 
the buffer.
+    if (list != null) {
+      bytes.write(toIntBytes(list.size()));
+      for (ByteBuffer bb : list) {
+        addByteBuffer(bb, bytes);
+      }
+    } else {
+      bytes.write(toIntBytes(0));
+    }
+  }
+
+  public static void addByteBuffer(ByteBuffer bb,
+    ByteArrayOutputStream bytes) throws IOException {
+    if (bb != null && bytes != null) {
+      bytes.write(toIntBytes(bb.array().length));
+      bytes.write(bb.array());
+    }
+  }
+
+  public static ByteBuffer toByteBuffer(Protos.TaskID taskId) {
+    return toBuffer(taskId);
+  }
+
+  public static ByteBuffer toByteBuffer(Protos.FrameworkID frameworkId) {
+    return toBuffer(frameworkId);
+  }
+
+  /*
+   * Common method to convert Protobuf object to ByteBuffer 
+   */
+  public static ByteBuffer toBuffer(GeneratedMessage message) {
+    byte dst[];
+    int size;
+    if (message != null) {
+      size = message.getSerializedSize() + INT_SIZE;
+      dst = message.toByteArray();
+    } else {
+      size = INT_SIZE;
+      dst = ZERO_BYTES;
+    }
+    ByteBuffer bb = createBuffer(size);
+    putBytes(bb, dst);
+    bb.rewind();
+    return bb;
+  }
+
+  public static byte[] toIntBytes(int src) {
+    ByteBuffer bb = createBuffer(INT_SIZE);
+    bb.putInt(src);
+    return bb.array();
+  }
+
+  public static ByteBuffer toByteBuffer(NodeTask nt) {
+    // Determine the size of ByteBuffer to allocate
+    // The NMProfile toString() returns Json, if this ever changes then this
+    // will fail. Json is expected.
+    byte[] profile = toBytes(nt.getProfile().toString());
+    int size = profile.length + INT_SIZE;
+
+    byte[] hostname = toBytes(nt.getHostname());
+    size += hostname.length + INT_SIZE;
+
+    if (nt.getSlaveId() != null) {
+      size += nt.getSlaveId().getSerializedSize() + INT_SIZE;
+    } else {
+      size += INT_SIZE;
+    }
+
+    if (nt.getTaskStatus() != null) {
+      size += nt.getTaskStatus().getSerializedSize() + INT_SIZE;
+    } else {
+      size += INT_SIZE;
+    }
+
+    // Allocate and populate the buffer.
+    ByteBuffer bb = createBuffer(size);
+    putBytes(bb, profile);
+    putBytes(bb, hostname);
+    putBytes(bb, getSlaveBytes(nt));
+    putBytes(bb, getTaskBytes(nt));
+    // Make sure the buffer is at the beginning
+    bb.rewind();
+    return bb;
+  }
+
+  /**
+   * Assumes the entire ByteBuffer is a TaskID.
+   *
+   * @param bb
+   * @return Protos.TaskID
+   */
+  public static Protos.TaskID toTaskId(ByteBuffer bb) {
+    try {
+      return Protos.TaskID.parseFrom(getBytes(bb, bb.getInt()));
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to parse Task ID", e);
+    }
+  }
+
+  /**
+   * Assumes the entire ByteBuffer is a FrameworkID.
+   *
+   * @param bb
+   * @return Protos.FrameworkID
+   */
+  public static Protos.FrameworkID toFrameworkID(ByteBuffer bb) {
+    try {
+      return Protos.FrameworkID.parseFrom(getBytes(bb, bb.getInt()));
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to parse Framework ID", e);
+    }
+  }
+
+  /**
+   * ByteBuffer is expected to have a NodeTask at its next position.
+  *
+  * @param bb
+  * @return NodeTask or null if buffer is empty. Can throw a RuntimeException
+  * if the buffer is not formatted correctly.
+  */
+  public static NodeTask toNodeTask(ByteBuffer bb) {
+    NodeTask nt = null;
+    if (bb != null && bb.array().length > 0) {
+      nt = new NodeTask(getProfile(bb));
+      nt.setHostname(toString(bb));
+      nt.setSlaveId(toSlaveId(bb));
+      nt.setTaskStatus(toTaskStatus(bb));
+    }
+    return nt;
+  }
+
+  public static byte[] getTaskBytes(NodeTask nt) {
+    if (nt.getTaskStatus() != null) {
+      return nt.getTaskStatus().toByteArray();
+    } else {
+      return ZERO_BYTES;
+    }
+  }
+
+  public static byte[] getSlaveBytes(NodeTask nt) {
+    if (nt.getSlaveId() != null) {
+      return nt.getSlaveId().toByteArray();
+    } else {
+      return ZERO_BYTES;
+    }
+  }
+
+  public static void putBytes(ByteBuffer bb, byte bytes[]) {
+    if (bytes != null && bytes.length > 0) {
+      bb.putInt(bytes.length);
+      bb.put(bytes);
+    } else {
+      bb.putInt(0);
+    }
+  }
+
+  public static byte[] getBytes(ByteBuffer bb, int size) {
+    byte bytes[] = new byte[size];
+    bb.get(bytes);
+    return bytes;
+  }
+
+  /**
+   * This assumes the next position is the size as an int, and the following 
is a string
+   * iff the size is not zero.
+   *
+   * @param bb ByteBuffer to extract string from
+   * @return string from the next position, or "" if the size is zero
+   */
+  public static String toString(ByteBuffer bb) {
+    byte [] bytes = new byte[bb.getInt()];
+    String s = "";
+    try {
+      if (bytes.length > 0) {
+        bb.get(bytes);
+        s = new String(bytes, UTF8);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("ByteBuffer not in expected format," +
+        " failed to parse string bytes", e);
+    }
+    return s;
+  }
+
+  public static byte[] toBytes(String s) {
+    try {
+      return s.getBytes(UTF8);
+    } catch (Exception e) {
+      return ZERO_BYTES;
+    }
+  }
+
+  public static NMProfile getProfile(ByteBuffer bb) {
+    String p = toString(bb);
+    if (!StringUtils.isEmpty(p)) {
+      return gson.fromJson(p, NMProfile.class);
+    } else {
+      return null;
+    }
+  }
+
+  public static Protos.SlaveID toSlaveId(ByteBuffer bb) {
+    int size = bb.getInt();
+    if (size > 0) {
+      try {
+        return Protos.SlaveID.parseFrom(getBytes(bb, size));
+      } catch (Exception e) {
+        throw new RuntimeException("ByteBuffer not in expected format," +
+          " failed to parse SlaveId bytes", e);
+      }
+    } else {
+      return null;
+    }
+  }
+
+  public static Protos.TaskStatus toTaskStatus(ByteBuffer bb) {
+    int size = bb.getInt();
+    if (size > 0) {
+      try {
+        return Protos.TaskStatus.parseFrom(getBytes(bb, size));
+      } catch (Exception e) {
+        throw new RuntimeException("ByteBuffer not in expected format," +
+          " failed to parse TaskStatus bytes", e);
+      }
+    } else {
+      return null;
+    }
+  }
+
+  public static ByteBuffer fillBuffer(byte src[]) {
+    ByteBuffer bb = createBuffer(src.length);
+    bb.put(src);
+    bb.rewind();
+    return bb;
+  }
+
+  public static List<ByteBuffer> createBufferList(ByteBuffer bb, int size) {
+    List<ByteBuffer> list = new ArrayList<ByteBuffer>(size);
+    for (int i = 0; i < size; i++) {
+      list.add(fillBuffer(getBytes(bb, bb.getInt())));
+    }
+    return list;
+  }
+
+  private static ByteBuffer createBuffer(int size) {
+    return ByteBuffer.allocate(size).order(ByteOrder.LITTLE_ENDIAN);
+  }
+
+  public static ByteBuffer createBuffer(ByteBuffer bb) {
+    return fillBuffer(getBytes(bb, bb.getInt()));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d1eca5c9/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
new file mode 100644
index 0000000..a62ddc4
--- /dev/null
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ebay.myriad.state.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.TaskID;
+
+import com.ebay.myriad.state.NodeTask;
+
+/**
+* The purpose of this container/utility is to create a mechanism to serialize 
the SchedulerState
+* to RMStateStore and back. Json did not seem to handle the Protos fields very 
well so this was an
+* alternative approach.
+*/
+public final class StoreContext {
+  private ByteBuffer frameworkId;
+  private List<ByteBuffer> taskIds;
+  private List<ByteBuffer> taskNodes;
+  private List<ByteBuffer> pendingTasks;
+  private List<ByteBuffer> stagingTasks;
+  private List<ByteBuffer> activeTasks;
+  private List<ByteBuffer> lostTasks;
+  private List<ByteBuffer> killableTasks;
+
+  public StoreContext() {
+  }
+
+  /**
+   * Accept all the SchedulerState maps and flatten them into lists of 
ByteBuffers
+   * @param tasks
+   * @param pendingTasks
+   * @param stagingTasks
+   * @param activeTasks
+   * @param lostTasks
+   * @param killableTasks
+   */
+  public StoreContext(Protos.FrameworkID frameworkId,
+    Map<Protos.TaskID, NodeTask> tasks,
+    Set<Protos.TaskID> pendingTasks, Set<Protos.TaskID> stagingTasks,
+    Set<Protos.TaskID> activeTasks, Set<Protos.TaskID> lostTasks,
+    Set<Protos.TaskID> killableTasks) {
+    setFrameworkId(frameworkId);
+    setTasks(tasks);
+    setPendingTasks(pendingTasks);
+    setStagingTasks(stagingTasks);
+    setActiveTasks(activeTasks);
+    setLostTasks(lostTasks);
+    setKillableTasks(killableTasks);
+  }
+
+  /**
+   * Accept list of ByteBuffers and re-create the SchedulerState maps.
+   * @param framwrorkId
+   * @param taskIds
+   * @param taskNodes
+   * @param pendingTasks
+   * @param stagingTasks
+   * @param activeTasks
+   * @param lostTasks
+   * @param killableTasks
+   */
+  public StoreContext(ByteBuffer frameworkId,
+    List<ByteBuffer> taskIds, List<ByteBuffer> taskNodes,
+    List<ByteBuffer> pendingTasks, List<ByteBuffer> stagingTasks,
+    List<ByteBuffer> activeTasks, List<ByteBuffer> lostTasks,
+    List<ByteBuffer> killableTasks) {
+    this.frameworkId = frameworkId;
+    this.taskIds = taskIds;
+    this.taskNodes = taskNodes;
+    this.pendingTasks = pendingTasks;
+    this.stagingTasks = stagingTasks;
+    this.activeTasks = activeTasks;
+    this.lostTasks = lostTasks;
+    this.killableTasks = killableTasks;
+  }
+
+  /**
+   * Use this to gather bytes to push to the state store
+   * @return byte stream of the state store context.
+   * @throws IOException
+   */
+  public ByteArrayOutputStream toSerializedContext() throws IOException {
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    ByteBufferSupport.addByteBuffer(frameworkId, bytes);
+    ByteBufferSupport.addByteBuffers(taskIds, bytes);
+    ByteBufferSupport.addByteBuffers(taskNodes, bytes);
+    ByteBufferSupport.addByteBuffers(pendingTasks, bytes);
+    ByteBufferSupport.addByteBuffers(stagingTasks, bytes);
+    ByteBufferSupport.addByteBuffers(activeTasks, bytes);
+    ByteBufferSupport.addByteBuffers(lostTasks, bytes);
+    ByteBufferSupport.addByteBuffers(killableTasks, bytes);
+    return bytes;
+  }
+
+  /**
+   * When the bytes come back from the store, use this method to create a new 
context.
+   *
+   * @param bytes from state store
+   * @return initialized StoreContext to use to initialize a SchedulerState
+   */
+  @SuppressWarnings("unchecked")
+  public static StoreContext fromSerializedBytes(byte bytes[]) {
+    StoreContext ctx;
+    if (bytes != null && bytes.length > 0){
+      ByteBuffer bb = ByteBufferSupport.fillBuffer(bytes);
+      ByteBuffer frameworkId = ByteBufferSupport.createBuffer(bb);
+      List<ByteBuffer> taskIds = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
+      List<ByteBuffer> taskNodes = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
+      List<ByteBuffer> pendingTasks = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
+      List<ByteBuffer> stagingTasks = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
+      List<ByteBuffer> activeTasks = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
+      List<ByteBuffer> lostTasks = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
+      List<ByteBuffer> killableTasks = ByteBufferSupport.createBufferList(bb, 
bb.getInt());
+      ctx = new StoreContext(frameworkId, taskIds, taskNodes, pendingTasks, 
stagingTasks, activeTasks,
+        lostTasks, killableTasks);
+    } else {
+      ctx = new StoreContext();
+    }
+    return ctx;
+  }
+
+  /**
+   * Serialize tasks into internal ByteBuffers, removing the map.
+   *
+   * @param tasks
+   */
+  public void setTasks(Map<Protos.TaskID, NodeTask> tasks) {
+    taskIds = new ArrayList<ByteBuffer>(tasks.size());
+    taskNodes = new ArrayList<ByteBuffer>(tasks.size());
+    for (Entry<TaskID, NodeTask> entry : tasks.entrySet()) {
+      taskIds.add(ByteBufferSupport.toByteBuffer(entry.getKey()));
+      taskNodes.add(ByteBufferSupport.toByteBuffer(entry.getValue()));
+    }
+  }
+
+  /**
+   * De-serialize the internal ByteBuffer back into a Protos.FrameworkID.
+   *
+   * @return
+   */
+  public Protos.FrameworkID getFrameworkId() {
+    return ByteBufferSupport.toFrameworkID(frameworkId);
+  }
+
+  /**
+   * Serialize the Protos.FrameworkID into a ByteBuffer.  
+   */
+  public void setFrameworkId(Protos.FrameworkID frameworkId) {
+    if (frameworkId != null) {
+      this.frameworkId = ByteBufferSupport.toByteBuffer(frameworkId);
+    }
+  }
+
+  /**
+   * De-serialize the internal ByteBuffers back into a Task map.
+   *
+   * @return
+   */
+  public Map<Protos.TaskID, NodeTask> getTasks() {
+    Map<Protos.TaskID, NodeTask> map = null;
+    if (taskIds != null) {
+      map = new HashMap<Protos.TaskID, NodeTask>(taskIds.size());
+      int idx = 0;
+      for (ByteBuffer bb : taskIds) {
+        map.put(ByteBufferSupport.toTaskId(bb),
+          ByteBufferSupport.toNodeTask(taskNodes.get(idx++)));
+      }
+    } else {
+      map = new HashMap<Protos.TaskID, NodeTask>(0);
+    }
+    return map;
+  }
+
+  public void setPendingTasks(Set<Protos.TaskID> tasks) {
+    if (tasks != null) {
+      pendingTasks = new ArrayList<ByteBuffer>(tasks.size());
+      toTaskBuffer(tasks, pendingTasks);
+    }
+  }
+
+  public Set<Protos.TaskID> getPendingTasks () {
+    return toTaskSet(pendingTasks);
+  }
+
+  public void setStagingTasks(Set<Protos.TaskID> tasks) {
+    if (tasks != null) {
+      stagingTasks = new ArrayList<ByteBuffer>(tasks.size());
+      toTaskBuffer(tasks, stagingTasks);
+    }
+  }
+
+  public Set<Protos.TaskID> getStagingTasks() {
+    return toTaskSet(stagingTasks);
+  }
+
+  public void setActiveTasks(Set<Protos.TaskID> tasks) {
+    if (tasks != null) {
+      activeTasks = new ArrayList<ByteBuffer>(tasks.size());
+      toTaskBuffer(tasks, activeTasks);
+    }
+  }
+
+  public Set<Protos.TaskID> getActiveTasks() {
+    return toTaskSet(activeTasks);
+  }
+
+  public void setLostTasks(Set<Protos.TaskID> tasks) {
+    if (tasks != null) {
+      lostTasks = new ArrayList<ByteBuffer>(tasks.size());
+      toTaskBuffer(tasks, lostTasks);
+    }
+  }
+
+  public Set<Protos.TaskID> getLostTasks() {
+    return toTaskSet(lostTasks);
+  }
+
+  public void setKillableTasks(Set<Protos.TaskID> tasks) {
+    if (tasks != null) {
+      killableTasks = new ArrayList<ByteBuffer>(tasks.size());
+      toTaskBuffer(tasks, killableTasks);
+    }
+  }
+
+  public Set<Protos.TaskID> getKillableTasks() {
+    return toTaskSet(killableTasks);
+  }
+
+  private void toTaskBuffer(Set<Protos.TaskID> src, List<ByteBuffer> tgt) {
+    for (Protos.TaskID id : src) {
+       tgt.add(ByteBufferSupport.toByteBuffer(id));
+    }
+  }
+
+  private Set<Protos.TaskID> toTaskSet(List<ByteBuffer> src) {
+    Set<Protos.TaskID> tasks;
+    if (src != null) {
+      tasks = new HashSet<Protos.TaskID>(src.size());
+      for (int i = 0; i < src.size(); i++) {
+        tasks.add(ByteBufferSupport.toTaskId(src.get(i)));
+      }
+    } else {
+      tasks = new HashSet<Protos.TaskID>(0);
+    }
+    return tasks;
+  }
+}

Reply via email to