Scheduler state serialization and deserialization code contributed by Paul Read StoreContext : Converts SchedulerState to byte[] and back. ByteBufferSupport : Helper class that adds serialized objects to the bytestream and back
sdaingade: Made changes to add frameworkId Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/d1eca5c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/d1eca5c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/d1eca5c9 Branch: refs/heads/master Commit: d1eca5c984cb185e9d792faba03a40a88e6cc4b4 Parents: 1a2f8a0 Author: Paul Read <pdread...@gmail.com> Authored: Sat Aug 1 11:19:41 2015 -0700 Committer: Swapnil Daingade <sdaing...@maprtech.com> Committed: Sat Aug 29 11:41:33 2015 -0700 ---------------------------------------------------------------------- .../myriad/state/utils/ByteBufferSupport.java | 298 +++++++++++++++++++ .../ebay/myriad/state/utils/StoreContext.java | 273 +++++++++++++++++ 2 files changed, 571 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d1eca5c9/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java new file mode 100644 index 0000000..e1081f0 --- /dev/null +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ebay.myriad.state.utils; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.lang3.StringUtils; +import org.apache.mesos.Protos; +import com.ebay.myriad.scheduler.NMProfile; +import com.ebay.myriad.state.NodeTask; +import com.google.gson.Gson; +import com.google.protobuf.GeneratedMessage; + +/** +* ByteBuffer support for the Serialization of the StoreContext +*/ +public class ByteBufferSupport { + + public static final int INT_SIZE = Integer.SIZE / Byte.SIZE; + public static final String UTF8 = "UTF-8"; + public static final byte[] ZERO_BYTES = new byte[0]; + private static Gson gson = new Gson(); + + public static void addByteBuffers(List<ByteBuffer> list, + ByteArrayOutputStream bytes) throws IOException { + // If list, add the list size, then the size of each buffer followed by the buffer. + if (list != null) { + bytes.write(toIntBytes(list.size())); + for (ByteBuffer bb : list) { + addByteBuffer(bb, bytes); + } + } else { + bytes.write(toIntBytes(0)); + } + } + + public static void addByteBuffer(ByteBuffer bb, + ByteArrayOutputStream bytes) throws IOException { + if (bb != null && bytes != null) { + bytes.write(toIntBytes(bb.array().length)); + bytes.write(bb.array()); + } + } + + public static ByteBuffer toByteBuffer(Protos.TaskID taskId) { + return toBuffer(taskId); + } + + public static ByteBuffer toByteBuffer(Protos.FrameworkID frameworkId) { + return toBuffer(frameworkId); + } + + /* + * Common method to convert Protobuf object to ByteBuffer + */ + public static ByteBuffer toBuffer(GeneratedMessage message) { + byte dst[]; + int size; + if (message != null) { + size = message.getSerializedSize() + INT_SIZE; + dst = message.toByteArray(); + } else { + size = INT_SIZE; + dst = ZERO_BYTES; + } + ByteBuffer bb = createBuffer(size); + putBytes(bb, dst); + bb.rewind(); + return bb; + } + + public static byte[] toIntBytes(int src) { + ByteBuffer bb = createBuffer(INT_SIZE); + bb.putInt(src); + return bb.array(); + } + + public static ByteBuffer toByteBuffer(NodeTask nt) { + // Determine the size of ByteBuffer to allocate + // The NMProfile toString() returns Json, if this ever changes then this + // will fail. Json is expected. + byte[] profile = toBytes(nt.getProfile().toString()); + int size = profile.length + INT_SIZE; + + byte[] hostname = toBytes(nt.getHostname()); + size += hostname.length + INT_SIZE; + + if (nt.getSlaveId() != null) { + size += nt.getSlaveId().getSerializedSize() + INT_SIZE; + } else { + size += INT_SIZE; + } + + if (nt.getTaskStatus() != null) { + size += nt.getTaskStatus().getSerializedSize() + INT_SIZE; + } else { + size += INT_SIZE; + } + + // Allocate and populate the buffer. + ByteBuffer bb = createBuffer(size); + putBytes(bb, profile); + putBytes(bb, hostname); + putBytes(bb, getSlaveBytes(nt)); + putBytes(bb, getTaskBytes(nt)); + // Make sure the buffer is at the beginning + bb.rewind(); + return bb; + } + + /** + * Assumes the entire ByteBuffer is a TaskID. + * + * @param bb + * @return Protos.TaskID + */ + public static Protos.TaskID toTaskId(ByteBuffer bb) { + try { + return Protos.TaskID.parseFrom(getBytes(bb, bb.getInt())); + } catch (Exception e) { + throw new RuntimeException("Failed to parse Task ID", e); + } + } + + /** + * Assumes the entire ByteBuffer is a FrameworkID. + * + * @param bb + * @return Protos.FrameworkID + */ + public static Protos.FrameworkID toFrameworkID(ByteBuffer bb) { + try { + return Protos.FrameworkID.parseFrom(getBytes(bb, bb.getInt())); + } catch (Exception e) { + throw new RuntimeException("Failed to parse Framework ID", e); + } + } + + /** + * ByteBuffer is expected to have a NodeTask at its next position. + * + * @param bb + * @return NodeTask or null if buffer is empty. Can throw a RuntimeException + * if the buffer is not formatted correctly. + */ + public static NodeTask toNodeTask(ByteBuffer bb) { + NodeTask nt = null; + if (bb != null && bb.array().length > 0) { + nt = new NodeTask(getProfile(bb)); + nt.setHostname(toString(bb)); + nt.setSlaveId(toSlaveId(bb)); + nt.setTaskStatus(toTaskStatus(bb)); + } + return nt; + } + + public static byte[] getTaskBytes(NodeTask nt) { + if (nt.getTaskStatus() != null) { + return nt.getTaskStatus().toByteArray(); + } else { + return ZERO_BYTES; + } + } + + public static byte[] getSlaveBytes(NodeTask nt) { + if (nt.getSlaveId() != null) { + return nt.getSlaveId().toByteArray(); + } else { + return ZERO_BYTES; + } + } + + public static void putBytes(ByteBuffer bb, byte bytes[]) { + if (bytes != null && bytes.length > 0) { + bb.putInt(bytes.length); + bb.put(bytes); + } else { + bb.putInt(0); + } + } + + public static byte[] getBytes(ByteBuffer bb, int size) { + byte bytes[] = new byte[size]; + bb.get(bytes); + return bytes; + } + + /** + * This assumes the next position is the size as an int, and the following is a string + * iff the size is not zero. + * + * @param bb ByteBuffer to extract string from + * @return string from the next position, or "" if the size is zero + */ + public static String toString(ByteBuffer bb) { + byte [] bytes = new byte[bb.getInt()]; + String s = ""; + try { + if (bytes.length > 0) { + bb.get(bytes); + s = new String(bytes, UTF8); + } + } catch (Exception e) { + throw new RuntimeException("ByteBuffer not in expected format," + + " failed to parse string bytes", e); + } + return s; + } + + public static byte[] toBytes(String s) { + try { + return s.getBytes(UTF8); + } catch (Exception e) { + return ZERO_BYTES; + } + } + + public static NMProfile getProfile(ByteBuffer bb) { + String p = toString(bb); + if (!StringUtils.isEmpty(p)) { + return gson.fromJson(p, NMProfile.class); + } else { + return null; + } + } + + public static Protos.SlaveID toSlaveId(ByteBuffer bb) { + int size = bb.getInt(); + if (size > 0) { + try { + return Protos.SlaveID.parseFrom(getBytes(bb, size)); + } catch (Exception e) { + throw new RuntimeException("ByteBuffer not in expected format," + + " failed to parse SlaveId bytes", e); + } + } else { + return null; + } + } + + public static Protos.TaskStatus toTaskStatus(ByteBuffer bb) { + int size = bb.getInt(); + if (size > 0) { + try { + return Protos.TaskStatus.parseFrom(getBytes(bb, size)); + } catch (Exception e) { + throw new RuntimeException("ByteBuffer not in expected format," + + " failed to parse TaskStatus bytes", e); + } + } else { + return null; + } + } + + public static ByteBuffer fillBuffer(byte src[]) { + ByteBuffer bb = createBuffer(src.length); + bb.put(src); + bb.rewind(); + return bb; + } + + public static List<ByteBuffer> createBufferList(ByteBuffer bb, int size) { + List<ByteBuffer> list = new ArrayList<ByteBuffer>(size); + for (int i = 0; i < size; i++) { + list.add(fillBuffer(getBytes(bb, bb.getInt()))); + } + return list; + } + + private static ByteBuffer createBuffer(int size) { + return ByteBuffer.allocate(size).order(ByteOrder.LITTLE_ENDIAN); + } + + public static ByteBuffer createBuffer(ByteBuffer bb) { + return fillBuffer(getBytes(bb, bb.getInt())); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d1eca5c9/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java new file mode 100644 index 0000000..a62ddc4 --- /dev/null +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/StoreContext.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ebay.myriad.state.utils; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.TaskID; + +import com.ebay.myriad.state.NodeTask; + +/** +* The purpose of this container/utility is to create a mechanism to serialize the SchedulerState +* to RMStateStore and back. Json did not seem to handle the Protos fields very well so this was an +* alternative approach. +*/ +public final class StoreContext { + private ByteBuffer frameworkId; + private List<ByteBuffer> taskIds; + private List<ByteBuffer> taskNodes; + private List<ByteBuffer> pendingTasks; + private List<ByteBuffer> stagingTasks; + private List<ByteBuffer> activeTasks; + private List<ByteBuffer> lostTasks; + private List<ByteBuffer> killableTasks; + + public StoreContext() { + } + + /** + * Accept all the SchedulerState maps and flatten them into lists of ByteBuffers + * @param tasks + * @param pendingTasks + * @param stagingTasks + * @param activeTasks + * @param lostTasks + * @param killableTasks + */ + public StoreContext(Protos.FrameworkID frameworkId, + Map<Protos.TaskID, NodeTask> tasks, + Set<Protos.TaskID> pendingTasks, Set<Protos.TaskID> stagingTasks, + Set<Protos.TaskID> activeTasks, Set<Protos.TaskID> lostTasks, + Set<Protos.TaskID> killableTasks) { + setFrameworkId(frameworkId); + setTasks(tasks); + setPendingTasks(pendingTasks); + setStagingTasks(stagingTasks); + setActiveTasks(activeTasks); + setLostTasks(lostTasks); + setKillableTasks(killableTasks); + } + + /** + * Accept list of ByteBuffers and re-create the SchedulerState maps. + * @param framwrorkId + * @param taskIds + * @param taskNodes + * @param pendingTasks + * @param stagingTasks + * @param activeTasks + * @param lostTasks + * @param killableTasks + */ + public StoreContext(ByteBuffer frameworkId, + List<ByteBuffer> taskIds, List<ByteBuffer> taskNodes, + List<ByteBuffer> pendingTasks, List<ByteBuffer> stagingTasks, + List<ByteBuffer> activeTasks, List<ByteBuffer> lostTasks, + List<ByteBuffer> killableTasks) { + this.frameworkId = frameworkId; + this.taskIds = taskIds; + this.taskNodes = taskNodes; + this.pendingTasks = pendingTasks; + this.stagingTasks = stagingTasks; + this.activeTasks = activeTasks; + this.lostTasks = lostTasks; + this.killableTasks = killableTasks; + } + + /** + * Use this to gather bytes to push to the state store + * @return byte stream of the state store context. + * @throws IOException + */ + public ByteArrayOutputStream toSerializedContext() throws IOException { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ByteBufferSupport.addByteBuffer(frameworkId, bytes); + ByteBufferSupport.addByteBuffers(taskIds, bytes); + ByteBufferSupport.addByteBuffers(taskNodes, bytes); + ByteBufferSupport.addByteBuffers(pendingTasks, bytes); + ByteBufferSupport.addByteBuffers(stagingTasks, bytes); + ByteBufferSupport.addByteBuffers(activeTasks, bytes); + ByteBufferSupport.addByteBuffers(lostTasks, bytes); + ByteBufferSupport.addByteBuffers(killableTasks, bytes); + return bytes; + } + + /** + * When the bytes come back from the store, use this method to create a new context. + * + * @param bytes from state store + * @return initialized StoreContext to use to initialize a SchedulerState + */ + @SuppressWarnings("unchecked") + public static StoreContext fromSerializedBytes(byte bytes[]) { + StoreContext ctx; + if (bytes != null && bytes.length > 0){ + ByteBuffer bb = ByteBufferSupport.fillBuffer(bytes); + ByteBuffer frameworkId = ByteBufferSupport.createBuffer(bb); + List<ByteBuffer> taskIds = ByteBufferSupport.createBufferList(bb, bb.getInt()); + List<ByteBuffer> taskNodes = ByteBufferSupport.createBufferList(bb, bb.getInt()); + List<ByteBuffer> pendingTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); + List<ByteBuffer> stagingTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); + List<ByteBuffer> activeTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); + List<ByteBuffer> lostTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); + List<ByteBuffer> killableTasks = ByteBufferSupport.createBufferList(bb, bb.getInt()); + ctx = new StoreContext(frameworkId, taskIds, taskNodes, pendingTasks, stagingTasks, activeTasks, + lostTasks, killableTasks); + } else { + ctx = new StoreContext(); + } + return ctx; + } + + /** + * Serialize tasks into internal ByteBuffers, removing the map. + * + * @param tasks + */ + public void setTasks(Map<Protos.TaskID, NodeTask> tasks) { + taskIds = new ArrayList<ByteBuffer>(tasks.size()); + taskNodes = new ArrayList<ByteBuffer>(tasks.size()); + for (Entry<TaskID, NodeTask> entry : tasks.entrySet()) { + taskIds.add(ByteBufferSupport.toByteBuffer(entry.getKey())); + taskNodes.add(ByteBufferSupport.toByteBuffer(entry.getValue())); + } + } + + /** + * De-serialize the internal ByteBuffer back into a Protos.FrameworkID. + * + * @return + */ + public Protos.FrameworkID getFrameworkId() { + return ByteBufferSupport.toFrameworkID(frameworkId); + } + + /** + * Serialize the Protos.FrameworkID into a ByteBuffer. + */ + public void setFrameworkId(Protos.FrameworkID frameworkId) { + if (frameworkId != null) { + this.frameworkId = ByteBufferSupport.toByteBuffer(frameworkId); + } + } + + /** + * De-serialize the internal ByteBuffers back into a Task map. + * + * @return + */ + public Map<Protos.TaskID, NodeTask> getTasks() { + Map<Protos.TaskID, NodeTask> map = null; + if (taskIds != null) { + map = new HashMap<Protos.TaskID, NodeTask>(taskIds.size()); + int idx = 0; + for (ByteBuffer bb : taskIds) { + map.put(ByteBufferSupport.toTaskId(bb), + ByteBufferSupport.toNodeTask(taskNodes.get(idx++))); + } + } else { + map = new HashMap<Protos.TaskID, NodeTask>(0); + } + return map; + } + + public void setPendingTasks(Set<Protos.TaskID> tasks) { + if (tasks != null) { + pendingTasks = new ArrayList<ByteBuffer>(tasks.size()); + toTaskBuffer(tasks, pendingTasks); + } + } + + public Set<Protos.TaskID> getPendingTasks () { + return toTaskSet(pendingTasks); + } + + public void setStagingTasks(Set<Protos.TaskID> tasks) { + if (tasks != null) { + stagingTasks = new ArrayList<ByteBuffer>(tasks.size()); + toTaskBuffer(tasks, stagingTasks); + } + } + + public Set<Protos.TaskID> getStagingTasks() { + return toTaskSet(stagingTasks); + } + + public void setActiveTasks(Set<Protos.TaskID> tasks) { + if (tasks != null) { + activeTasks = new ArrayList<ByteBuffer>(tasks.size()); + toTaskBuffer(tasks, activeTasks); + } + } + + public Set<Protos.TaskID> getActiveTasks() { + return toTaskSet(activeTasks); + } + + public void setLostTasks(Set<Protos.TaskID> tasks) { + if (tasks != null) { + lostTasks = new ArrayList<ByteBuffer>(tasks.size()); + toTaskBuffer(tasks, lostTasks); + } + } + + public Set<Protos.TaskID> getLostTasks() { + return toTaskSet(lostTasks); + } + + public void setKillableTasks(Set<Protos.TaskID> tasks) { + if (tasks != null) { + killableTasks = new ArrayList<ByteBuffer>(tasks.size()); + toTaskBuffer(tasks, killableTasks); + } + } + + public Set<Protos.TaskID> getKillableTasks() { + return toTaskSet(killableTasks); + } + + private void toTaskBuffer(Set<Protos.TaskID> src, List<ByteBuffer> tgt) { + for (Protos.TaskID id : src) { + tgt.add(ByteBufferSupport.toByteBuffer(id)); + } + } + + private Set<Protos.TaskID> toTaskSet(List<ByteBuffer> src) { + Set<Protos.TaskID> tasks; + if (src != null) { + tasks = new HashSet<Protos.TaskID>(src.size()); + for (int i = 0; i < src.size(); i++) { + tasks.add(ByteBufferSupport.toTaskId(src.get(i))); + } + } else { + tasks = new HashSet<Protos.TaskID>(0); + } + return tasks; + } +}